1. 程式人生 > >【Spark】DAGScheduler源代碼淺析

【Spark】DAGScheduler源代碼淺析

under 提交 title 作者 sem lis git lean access

DAGScheduler

DAGScheduler的主要任務是基於Stage構建DAG,決定每個任務的最佳位置

  • 記錄哪個RDD或者Stage輸出被物化
  • 面向stage的調度層。為job生成以stage組成的DAG。提交TaskSet給TaskScheduler運行
  • 又一次提交shuffle輸出丟失的stage

每個Stage內。都是獨立的tasks,他們共同運行同一個computefunction,享有同樣的shuffledependencies。DAG在切分stage的時候是按照出現shuffle為界限的。

DAGScheduler實例化

以下的代碼是SparkContext實例化DAGScheduler的過程:

  @volatile private[spark] var dagScheduler: DAGScheduler = _
  try {
    dagScheduler = new DAGScheduler(this)
  } catch {
    case e: Exception => {
      try {
        stop()
      } finally {
        throw new SparkException("Error while constructing DAGScheduler"
, e) } } }

以下代碼顯示了DAGScheduler的構造函數定義中,通過綁定TaskScheduler的方式創建,當中次構造函數去調用主構造函數來將sc的字段填充入參:

private[spark]
class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock()
)
extends Logging { def this(sc: SparkContext, taskScheduler: TaskScheduler) = { this( sc, taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) } def this(sc: SparkContext) = this(sc, sc.taskScheduler)

作業提交與DAGScheduler操作

Action的大部分操作會進行作業(job)的提交,源代碼1.0版的job提交過程的大致調用鏈是:sc.runJob()–>dagScheduler.runJob–>dagScheduler.submitJob—>dagSchedulerEventProcessActor.JobSubmitted–>dagScheduler.handleJobSubmitted–>dagScheduler.submitStage–>dagScheduler.submitMissingTasks–>taskScheduler.submitTasks
詳細的作業提交運行期的函數調用為:

  1. sc.runJob->dagScheduler.runJob->submitJob
  2. DAGScheduler::submitJob會創建JobSummitted的event發送給內嵌類eventProcessActor(在源代碼1.4中,submitJob函數中,使用DAGSchedulerEventProcessLoop類進行事件的處理)
  3. eventProcessActor在接收到JobSubmmitted之後調用processEvent處理函數
  4. job到stage的轉換,生成finalStage並提交運行。關鍵是調用submitStage
  5. 在submitStage中會計算stage之間的依賴關系,依賴關系分為寬依賴和窄依賴兩種
  6. 假設計算中發現當前的stage沒有不論什麽依賴或者全部的依賴都已經準備完畢,則提交task
  7. 提交task是調用函數submitMissingTasks來完畢
  8. task真正運行在哪個worker上面是由TaskScheduler來管理,也就是上面的submitMissingTasks會調用TaskScheduler::submitTasks
  9. TaskSchedulerImpl中會依據Spark的當前運行模式來創建對應的backend,假設是在單機運行則創建LocalBackend
  10. LocalBackend收到TaskSchedulerImpl傳遞進來的ReceiveOffers事件
  11. receiveOffers->executor.launchTask->TaskRunner.run

DAGScheduler的runJob函數

DAGScheduler.runjob最後把結果通過resultHandler保存返回。


這裏DAGScheduler的runJob函數調用DAGScheduler的submitJob函數來提交任務:

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
    waiter.awaitResult() match {
      case JobSucceeded => {
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      }
      case JobFailed(exception: Exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        throw exception
    }
  }

作業提交的調度

在Spark源代碼1.4.0中,DAGScheduler的submitJob函數不再使用DAGEventProcessActor進行事件處理和消息通信,而是使用DAGSchedulerEventProcessLoop類實例eventProcessLoop進行JobSubmitted事件的post動作。
以下是submitJob函數代碼:

  /**
   * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
   * can be used to block until the the job finishes executing or can be used to cancel the job.
   */
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
    waiter
  }

當eventProcessLoop對象投遞了JobSubmitted事件之後,對象內的eventThread線程實例對事件進行處理。不斷從事件隊列中取出事件,調用onReceive函數處理事件。當匹配到JobSubmitted事件後。調用DAGScheduler的handleJobSubmitted函數並傳入jobid、rdd等參數來處理Job。


技術分享

handleJobSubmitted函數

Job處理過程中handleJobSubmitted比較關鍵,該函數主要負責RDD的依賴性分析。生成finalStage,並依據finalStage來產生ActiveJob。
在handleJobSubmitted函數源代碼中。給出了部分凝視:

  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: Stage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
    } catch {
      //錯誤處理,告訴監聽器作業失敗。返回....
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    if (finalStage != null) {
      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
      clearCacheLocs()
      logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
        job.jobId, callSite.shortForm, partitions.length, allowLocal))
      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
      logInfo("Parents of final stage: " + finalStage.parents)
      logInfo("Missing parents: " + getMissingParentStages(finalStage))
      val shouldRunLocally =
        localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
      val jobSubmissionTime = clock.getTimeMillis()
      if (shouldRunLocally) {
        // 非常短、沒有父stage的本地操作,比方 first() or take() 的操作本地運行
        // Compute very short actions like first() or take() with no parent stages locally.
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
        runLocally(job)
      } else {
        // collect等操作走的是這個過程。更新相關的關系映射,用監聽器監聽,然後提交作業
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.resultOfJob = Some(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        // 提交stage
        submitStage(finalStage)
      }
    }
    // 提交stage
    submitWaitingStages()
  }

小結

該篇文章介紹了DAGScheduler從SparkContext中進行實例化,到運行Action操作時提交任務調用runJob函數,進而介紹了提交任務的消息調度,和處理Job函數handleJobSubmitted函數。
因為在handleJobSubmitted函數中涉及到依賴性分析和stage的源代碼內容,於是我計劃在下一篇文章裏進行介紹和源代碼分析。

轉載請註明作者Jason Ding及其出處
GitCafe博客主頁(http://jasonding1354.gitcafe.io/)
Github博客主頁(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡書主頁(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進入我的博客主頁

【Spark】DAGScheduler源代碼淺析