Spark1.6-----原始碼解讀之DAGScheduler
阿新 • • 發佈:2018-12-16
純滑鼠點程式碼寫出來的,閱讀時希望你能跟著這樣操作。
DAGScheduler的主要用於在任務正式提交給TaskSchedulerImpl提交之前做一些準備工作。比如建立job,將DAG的RDD劃分到不同的stage,提交stage
SparkContext 525行建立DAGScheduler:
_dagScheduler = new DAGScheduler(this)
DAGScheduler 133行為其維護的主要資料結構如下:
主要維護jobId和stageId的關係,Stage,ActiveJob,以及快取的RDD的partitions的位置資訊
private[spark] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this) private[scheduler] val nextJobId = new AtomicInteger(0) private[scheduler] def numTotalJobs: Int = nextJobId.get() private val nextStageId = new AtomicInteger(0) private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] private[scheduler] val stageIdToStage = new HashMap[Int, Stage] private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage] private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] // Stages we need to run whose parents aren't done private[scheduler] val waitingStages = new HashSet[Stage] // Stages we are running right now private[scheduler] val runningStages = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures private[scheduler] val failedStages = new HashSet[Stage] private[scheduler] val activeJobs = new HashSet[ActiveJob] /** * Contains the locations that each RDD's partitions are cached on. This map's keys are RDD ids * and its values are arrays indexed by partition numbers. Each array value is the set of * locations where that RDD partition is cached. * * All accesses to this map should be guarded by synchronizing on it (see SPARK-4454). */ private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]] // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with // every task. When we detect a node failing, we note the current epoch number and failed // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results. // // TODO: Garbage collect information about failure epochs when we know there are no more // stray messages to detect. private val failedEpoch = new HashMap[String, Long] private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator // A closure serializer that we reuse. // This is only safe because DAGScheduler runs in a single thread. private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
DAGScheduler 184行建立DAGSchedulerEventProcessLoop 主要負責對訊息的接受和處理
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
DAGScheduler 1588為DAGSchedulerEventProcessLoop 具體實現:
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler) extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
該類繼承EventLoop,去看看EvenLoop具體實現:
private val eventThread = new Thread(name) { //可以看出為守護程序 setDaemon(true) override def run(): Unit = { try { //一直迴圈 while (!stopped.get) { //獲取佇列中資料並且處理,如果沒有就阻塞 val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => { try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } }
DAGScheduler 1605行為 DAGSchedulerEventProcessLoop 能處理的訊息:
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
DAGScheduler建立完畢
總結一下:
DAGScheduler中有一堆維護job stage的資料結構。
生成DAGSchedulerEventProcessLoop用來處理各種事件。