1. 程式人生 > >Spark:DAGScheduler原理剖析與原始碼分析

Spark:DAGScheduler原理剖析與原始碼分析

Job觸發流程原理與原始碼解析

在這裡插入圖片描述

wordcount案例解析,來分析Spark Job的觸發流程
程式碼:var linesRDD= sc.textFile('hdfs://')
SparkContext中textFile方法

  /**
   * hadoopFile方法呼叫會建立一個HadoopRDD,其中的元素pair是(key,value)
   * key是hdfs或者文字檔案的每一行的offset,value就是文字行 	
   * 然後,呼叫map方法,過濾掉key,剩下value,最終獲得一個MapPartitionRDD,其內部是一行一行的文字行
   */
  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    // hadoop mapreduce TextInputFormat:讀取文字資料的一種方式  LongWritable 讀取到的文字的偏移量(行號) Text 讀到的文字
    // map操作中,pair就是行號和文字資料對映的tuple,pair._2.toString 就是取資料
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString)
  }

hadoopFile方法

  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
    assertNotStopped()
    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    // 建立的HadoopRDD讀取配置檔案的之後,上面已經做了廣播變數,在本機worker上就可以讀到
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

hadoopFile().map() 方法最終呼叫的是RDD.scala的map方法

  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

程式碼:var wordsRDD = linesRDD.flatMap(line => line.split(" "))
RDD.scala的flatMap方法

  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }

程式碼:var pairsRDD = wordsRDD.map(word => (word,1))
RDD.scala的map方法

  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

程式碼:var countRDD = pairsRDD.reduceByKey(_+_)

rdd中是沒有reduceByKey方法的,這裡有一個隱式轉換(相當於Java中的包裝類),程式執行過程中會找RDD.scala中的一個隱式轉換程式碼

  implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
    new PairRDDFunctions(rdd)
  }

最終建立了PairRDDFunctions類,MapPartitionRDD呼叫reduceByKey會觸發scala隱式轉換,在作用域中內尋找隱式轉換,最終將MapPartitionRDD轉換為PairRDDFunctions,呼叫reduceByKey方法。

  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKey[V]((v: V) => v, func, func, partitioner)
  }

程式碼:countRDD.foreach(count => println(count._1 + ":"+count._2))
RDD.scala的foreach方法

  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

有多個runJob的巢狀呼叫

  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.length)
  }
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: Iterator[T] => U,
      partitions: Seq[Int]): Array[U] = {
    val cleanedFunc = clean(func)
    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
  }
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
    results
  }
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    // 核心:呼叫SparkContext初始化建立的DAGSChduler的runJob方法
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

Stage劃分演算法的剖析

由於Spark的運算元構建一般都是鏈式的,這就涉及了要如何進行這些鏈式計算,Spark的策略是對這些運算元,先劃分Stage,然後在進行計算。

在這裡插入圖片描述

演算法總結:會從出發action操作的那個RDD往前倒推,首先會為最後一個RDD建立一個stage(stage1),然後往前倒推的時候,如果發現對某個RDD的寬依賴,那麼就會將寬依賴的那個RDD建立一個新的stage(stage0),那個RDD就是新的stage的最後一個RDD;最後依此類推,繼續往前倒推,根據窄依賴,或者寬依賴,進行stage的劃分,知道所有的RDD全部遍歷完了為止。

Spark Web頁面
在這裡插入圖片描述

原始碼解析
第一步:DAGScheduler類中runJob方法
原始碼地址:org.apache.spark.scheduler.DAGScheduler

  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {
      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }
  }

第二步:DAGScheduler類中submitJob方法

 private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
  taskScheduler.setDAGScheduler(this)

......

  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    // JobSubmitted物件給eventProcessLoop(訊息迴圈器)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }
//這裡面封裝了哪些Partition要進行計算,joblistener作業監聽等等
private[scheduler] case class JobSubmitted(
    jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties = null)
  extends DAGSchedulerEvent

......

第三步:eventProcessLoop類中post方法

//雙端佇列,任何一端都可以進行元素的出入
  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

  private val stopped = new AtomicBoolean(false)

  // Exposed for testing.
  private[spark] val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
        //接收訊息。在這裡並沒有直接實現OnReceive方法,具體方法實現是在DAGScheduler#onReceive
          val event = eventQueue.take() //取出放入的訊息
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

......

  def post(event: E): Unit = {
    eventQueue.put(event)
  }

第三步:onReceive方法

 protected def onReceive(event: E): Unit //抽象方法呼叫子類的實現。

第三步:onReceive方法子類(DAGSchedulerEventProcessLoop)的實現
原始碼地址:org.apache.spark.scheduler.DAGScheduler.DAGSchedulerEventProcessLoop

  /**
   * The main event loop of the DAG scheduler.
   */
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
      doOnReceive(event)
    } finally {
      timerContext.stop()
    }
  }

第三步:doOnReceive方法

  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

......

}

DAGScheduler啟動一個執行緒EventLoop(訊息迴圈器),不斷地從訊息佇列中取訊息。訊息是通過EventLoop的put方法放入訊息佇列,當EventLoop拿到訊息後會回撥DAGScheduler的OnReceive,進而呼叫doOnReceive方法進行處理

第四步:handleJobSubmitted方法

/**
   *  DAGSchduler的job排程核心入口
   */
  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],//引數finalRDD為觸發action操作時最後一個RDD
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    // 使用觸發job的最後一個rdd,建立stage
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      //第一步 建立一個結果輸出stage,並加入DAGSchduler內部的記憶體快取中
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: BarrierJobSlotsNumberCheckFailed =>
        logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +
          "than the total number of slots in the cluster currently.")
        // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
        val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
          new BiFunction[Int, Int, Int] {
            override def apply(key: Int, value: Int): Int = value + 1
          })
        if (numCheckFailures <= maxFailureNumTasksCheck) {
          messageScheduler.schedule(
            new Runnable {
              override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
                partitions, callSite, listener, properties))
            },
            timeIntervalNumTasksCheck,
            TimeUnit.SECONDS
          )
          return
        } else {
          // Job failed, clear internal data.
          barrierJobIdToNumTasksCheckFailures.remove(jobId)
          listener.jobFailed(e)
          return
        }

      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    // Job submitted, clear internal data.
    barrierJobIdToNumTasksCheckFailures.remove(jobId)
    // 第二步 用finalStage建立一個job,即這個job的最後一個stage
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    //第三步,將job加入記憶體快取中
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    
   //第四步,使用submitStage()方法提交finalStage
   submitStage(finalStage)
  }

第四步:createResultStage方法

  /**
   * Create a ResultStage associated with the provided jobId.
   */
  private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
    //獲取父stages
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage //將Stage的id放入stageIdToStage(HashMap)結構中。
    updateJobIdStageIdMaps(jobId, stage) //更新JobIdStageIdMaps 
    stage 
  }

第五步:submitStage方法,stage劃分演算法的入口

/** Submits stage, but first recursively submits any missing parents. */
  /**
   * 這裡是stage劃分的入口,submitStage與getMissingParentStages方法共同完成了stage的劃分
   *
   */
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        //呼叫getMissingParentStages()方法,去獲取當前這個stage的父stage,並且按照id進行排序,
        //從小到大的進行排序,從前向後依次計算,這樣做的原因是不同的rdd存在依賴關係
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          //首先提交這個第一個stage,stage0,其餘的stage,此時全部都在waitingStage裡
          submitMissingTasks(stage, jobId.get)
        } else {
          // 遞迴呼叫
          for (parent <- missing) {
            submitStage(parent)
          }
          // 將當前stage放入等待執行的stage佇列
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

第六步:getMissingParentStages方法

/**
   * 獲取某個stage的父stage
   *
   * 如果最後一個rdd的所有依賴都是窄依賴,就不會建立新的shuffle stage
   * 但是,如果stage的rdd是寬依賴,就使用rdd的寬依賴建立新的stage,立即新的stage返回
   *
   */
  private def getMissingParentStages(stage: Stage): List[Stage] = {
    //定義一個hashset來存放stage
    val missing = new HashSet[Stage]
    //儲存已經被訪問的RDD,構建的時候是從後往前回溯的一個過程,回溯過之後就會被儲存起來。
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    //這裡只是快取RDD的資訊,並未真正計算,因為此時我們並沒有partition的資訊。
    val waitingForVisit = new ArrayStack[RDD[_]] //儲存需要被處理的RDD
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd //如果沒有被回溯過,那麼就將此RDD加入HashSet中
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) { //如果分割槽的數量是非空的
          //遍歷RDD依賴
          for (dep <- rdd.dependencies) {
            dep match {
              //那麼使用寬依賴的那個rdd,建立一個stage
              //預設最後一個stage,不是shuffleMap stage,而是ResultStage
              //但是finalStage之前所有的stage,都是shuffleMap stage
              case shufDep: ShuffleDependency[_, _, _] =>
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              //如果是窄依賴,那麼將依然的RDD放入棧中
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }

    // 往棧中,加入stage最後的一個rdd
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
      //呼叫內部visit方法
      visit(waitingForVisit.pop())
    }
    //返回這個stage的所有的父親節點,便於在後面遞迴的去查詢
    missing.toList
  }

第七步:getOrCreateShuffleMapStage方法

  private def getOrCreateShuffleMapStage(
    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int): ShuffleMapStage = {
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) =>
        stage

      case None =>
        // Create stages for all missing ancestor shuffle dependencies.
        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
          // that were not already in shuffleIdToMapStage, it's possible that by the time we
          // get to a particular dependency in the foreach loop, it's been added to
          // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
          // SPARK-13902 for more information.
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
            //建立一個shuffle stage
            createShuffleMapStage(dep, firstJobId)
          }
        }
        // Finally, create a stage for the given shuffle dependency.
        createShuffleMapStage(shuffleDep, firstJobId)
    }
  }

下面我們以下面的圖為例,來詳細敘述Stage的劃分
在這裡插入圖片描述

第一次迴圈:
1.RDD G傳進來,將RDD G壓棧.

waitingForVisit.push(stage.rdd)

2.此時的棧不空,將棧裡面的RDD G彈出,作為引數傳入visit函式內。

while (waitingForVisit.nonEmpty) {
      //呼叫內部visit方法
      visit(waitingForVisit.pop())
}

3.RDD G沒有被訪問過,所以執行if中的程式碼

 val waitingForVisit = new ArrayStack[RDD[_]] //儲存需要被處理的RDD
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {

4.對RDD G進行處理,加入visited中。

visited += rdd

5.遍歷RDD G依賴的父RDD

 for (dep <- rdd.dependencies) {
       dep match {
      //若是 RDD F 建立新的Stage,並將新建立的Stage儲存到missing 中
         case shufDep: ShuffleDependency[_, _, _] =>
         val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
         if (!mapStage.isAvailable) {
                  missing += mapStage
          }
       //若是RDD F 則為和RDD G是一個Stage,然後就將RDD F壓棧
        case narrowDep: NarrowDependency[_] =>
             waitingForVisit.push(narrowDep.rdd)
       }
 }

RDD G依賴兩個RDD:

  1. RDD B的依賴關係是窄依賴,因此,並不會產生新的RDD,只是將RDD B 壓入Stack棧中
  2. RDD F的依賴關係是寬依賴,因此,RDD G 和RDD F會被劃分成兩個Stage,Shuffle依賴的關係資訊儲存在missing 中,並且,RDD F所在的Stage 2是RDD G所在的Stage 3的父Stage
    在這裡插入圖片描述

第二次迴圈:Stack棧中 RDD B
RDD B所依賴的父RDD是RDD A 之間是寬依賴關係,因此要建立一個新的Stage為Stage 1
在這裡插入圖片描述

此時 missing 中存在兩個Stage,分別是 Stage 1與Stage 2

遞迴呼叫Stage 1,A無父RDD,提交stage

 // 遞迴呼叫,直到最初的stage,他沒有父stage
 for (parent <- missing) {
    submitStage(parent)
 }

遞迴呼叫Stage 2,而RDD F的父RDD都是窄依賴,所以不產生新的Stage,均為Stage 2.

stage劃分演算法總結:

  1. 從finalStage倒推
  2. 通過寬依賴,來進行新的stage的劃分
  3. 使用遞迴,優先提交父stage

Task任務本地性演算法

1.在submitMissingTasks中會通過呼叫以下程式碼來獲取任務的本地性。

 //提交stage,為stage建立一批task,task數量與partition數量相同
  private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")

    // First figure out the indexes of partition ids to compute.
    //獲取你要建立的task的數量
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

    // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
    // with this Stage
    val properties = jobIdToActiveJob(jobId).properties
    //將stage加入runningStage佇列
    runningStages += stage
    // SparkListenerStageSubmitted should be posted before testing whether tasks are
    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
    // event.
    stage match {
      case s: ShuffleMapStage =>
        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
      case s: ResultStage =>
        outputCommitCoordinator.stageStart(
          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
    }
    //獲得Task的本地性,task的最佳位置計算演算法
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id)) }.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

    // If there are tasks to execute, record the submission time of the stage. Otherwise,
    // post the even without the submission time, which indicates that this stage was
    // skipped.
    if (partitionsToCompute.nonEmpty) {
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    }
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

    // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
    // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
    // the serialized copy of the RDD and for each task we will deserialize it, which means each
    // task gets a different copy of the RDD. This provides stronger isolation between tasks that
    // might modify state of objects referenced in their closures. This is necessary in Hadoop
    // where the JobConf/Configuration object is not thread-safe.
    var taskBinary: Broadcast[Array[Byte]] = null
    var partitions: Array[Partition] = null
    try {
      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      var taskBinaryBytes: Array[Byte] = null
      // taskBinaryBytes and partitions are both effected by the checkpoint status. We need
      // this synchronization in case another concurrent job is checkpointing this RDD, so we get a
      // consistent view of both variables.
      RDDCheckpointData.synchronized {
        taskBinaryBytes = stage match {
          case stage: ShuffleMapStage =>
            JavaUtils.bufferToArray(
              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
          case stage: ResultStage =>
            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
        }

        partitions = stage.rdd.partitions
      }

      taskBinary = sc.broadcast(taskBinaryBytes)
    } catch {
      // In the case of a failure during serialization, abort the stage.
      case e: NotSerializableException =>
        abortStage(stage, "Task not serializable: " + e.toString, Some(e))
        runningStages -= stage

        // Abort execution
        return
      case NonFatal(e) =>
        abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }
    //根據不同的Stage型別建立不同的tasks佇列
    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            //為每一個Partition建立ShuffleMapTask,並計算最近位置
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            //對finalStage之外的,都建立ShuffleMapTask
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            //如果不是shuffleMap,那麼就是finalStage 是建立ResultTask的
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
              stage.rdd.isBarrier())
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

    if (tasks.size > 0) {
      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
      //將Tasks封裝到TaskSet中,並將TaskSet提交給TaskScheduler。
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)

      stage match {
        case stage: ShuffleMapStage =>
          logDebug(s"Stage ${stage} is actually done; " +
            s"(available: ${stage.isAvailable}," +
            s"available outputs: ${stage.numAvailableOutputs}," +
            s"partitions: ${stage.numPartitions})")
            // 如果Tasks執行完了,表示該Stage執行完成
          markMapStageJobsAsFinished(stage)
        case stage: ResultStage =>
          logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
      }
      submitWaitingChildStages(stage)
    }
  }

2.getPreferredLocsInternal方法

 /**
   * Recursive implementation for getPreferredLocs.
   *
   * This method is thread-safe because it only accesses DAGScheduler state through thread-safe
   * methods (getCacheLocs()); please be careful when modifying this method, because any new
   * DAGScheduler state accessed by it may require additional synchronization.
   *
   * 計算每個task對應Partition最佳位置
   * 從stage的最後一個rdd開始,去找哪個rdd的partition,是被cache了,或者checkpoint
   * 那麼,task的最佳位置,就是快取的 checkpoint的partition的位置
   * 因為這樣的話,task就在哪個節點上執行,不需要計算之前的rdd了
   */
  private def getPreferredLocsInternal(
    rdd: RDD[_],
    partition: Int,
    visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    // If the partition has already been visited, no need to re-visit.
    // This avoids exponential path exploration.  SPARK-695
    // 如果已訪問過RDD,即以獲得RDD的TaskLocation則不需再次獲得
    if (!visited.add((rdd, partition))) {
      // Nil has already been returned for previously visited partitions.
      return Nil
    }
    // If the partition is cached, return the cache locations
    //尋找當前rdd的partition是否快取了
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // If the RDD has some placement preferences (as is the case for input RDDs), get those
    //尋找當前rdd的partitions 是否checkpoint了
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }

    // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
    // that has any placement preferences. Ideally we would choose based on transfer sizes,
    // but this will do for now.
    //最後,遞迴呼叫自己,去尋找rdd 的父rdd,看著對應的partition是否快取或者checkpoint
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }

      case _ =>
    }
    //如果這個stage,從最後一個rdd,到最開始的rdd,partition都沒有被快取或者checkpoint
    //那麼,task的最佳位置preferredLocs,就是Nil
    Nil
  }

無論是通過哪種方式獲取RDD分割槽的優先位置,第一次計算的資料來源肯定都是通過RDD的preferredLocations方法獲取的,不同的RDD有不同的preferredLocations實現,但是資料無非就是在三個地方存在,被cache到記憶體、HDFS、磁碟,而這三種方式的TaskLocation都有具體的實現

/**
 * A location that includes both a host and an executor id on that host.
 */
//資料在記憶體中
private [spark]
case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
  extends TaskLocation {
  override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
}

/**
 * A location on a host.
 */
//資料在磁碟上(非HDFS上)
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {
  override def toString: String = host
}

/**
 * A location on a host that is cached by HDFS.
 */
//資料在HDFS上
private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {
  override def toString: String = TaskLocation.inMemoryLocationTag + host
}