1. 程式人生 > >Spark原始碼《三》Stage劃分

Spark原始碼《三》Stage劃分

當發生shuffle時,sc.runJob-->DAGScheduler.runJob-->submitStage(),提交stage時,

會首先判斷是否有未執行的父stage,如果沒有呼叫submitMissingTasks提交stage

如果有則呼叫submitStage()先提交父stage

1.sc.runJob

當有action運算元時,會呼叫sc.runJob方法,下圖是在原始碼中搜sc.runJob的結果

我們去SparkContext看下runJob方法:

def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    //fun:引數為迭代器
    runJob(rdd, func, 0 until rdd.splits.size, false)
  }
def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
    //fun:引數為TaskContext例項和迭代器
    runJob(rdd, func, 0 until rdd.splits.size, false)
  }
def runJob[T, U: ClassManifest](
      rdd: RDD[T],
      func: Iterator[T] => U, 
      partitions: Seq[Int],
      allowLocal: Boolean
      ): Array[U] = {
    runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
  }
def runJob[T, U: ClassManifest](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      allowLocal: Boolean
      ): Array[U] = {
    logInfo("Starting job...")
    val start = System.nanoTime//提供相對精確的計時
    val result = scheduler.runJob(rdd, func, partitions, allowLocal)
    logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s")
    result
  }

可以看到有四個過載的方法,但最終都會呼叫第四個,從第四個可以看到呼叫了scheduler.runJob方法

  private var scheduler: Scheduler = {
    //正則表示式
    val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
    val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r
    master match {
      case "local" => //如果是local,建立一個執行緒的本地排程器,失敗重試
        new LocalScheduler(1, 0)
      case LOCAL_N_REGEX(threads) => //如果是local[n],建立n個執行緒的本地排程器
        new LocalScheduler(threads.toInt, 0)
      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>//如果是local[n,m],m為重試次數
        new LocalScheduler(threads.toInt, maxFailures.toInt)
      case _ =>//否則建立mesos排程器
        MesosNativeLibrary.load()
        new MesosScheduler(this, master, frameworkName)
    }
  }

2.DAGScheduler.runJob

因為MesosScheduler和LocalScheduler都繼承了DAGScheduler,直接去看DAGScheduler的runJob

程式碼有點多,就貼重要的出來

override def runJob[T, U](
      finalRdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      allowLocal: Boolean)
      (implicit m: ClassManifest[U]): Array[U] = {
    lock.synchronized {
       
    ....定義的變數,陣列,集合

    submitStage(finalStage)

    ....

}

 

接下來看submitStage()方法,

該方法主要先判斷waiting(HashSet儲存等待執行的stage)和running(HashSet儲存正在執行的stage)是否包含該stage,

如果都不包含,則呼叫getMissingParentStages獲取該stage還未執行的父stage,

如果沒有未執行的父stage,呼叫submitMissingTasks提交stage,並將stage加入running列表

如果有未執行的父stage,先提交父stage執行,並將stage加入waiting

def submitStage(stage: Stage) {
        if (!waiting(stage) && !running(stage)) {
          val missing = getMissingParentStages(stage)
          if (missing == Nil) {
            logInfo("Submitting " + stage + ", which has no missing parents")
            submitMissingTasks(stage)
            running += stage
          } else {
            for (parent <- missing) {
              submitStage(parent)
            }
            waiting += stage
          }
        }
      }

 

接下來先看getMissingParentStages方法,該方法主要用於獲取stage未執行的父stage,劃分stage

visit()方法首先遍歷未被劃分stage的rdd的依賴,

如果是shuffle依賴,呼叫getShuffleMapStage獲取父stage,

如果是窄依賴,繼續呼叫visit()方法,直到發生shuffle

可以看出stage與父stage劃分的依賴就是是否發生shuffle

  def getMissingParentStages(stage: Stage): List[Stage] = {

    val missing = new HashSet[Stage]//儲存未執行的父stage
    val visited = new HashSet[RDD[_]]//儲存已經劃分stage的RDD

    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {//首先判斷是否劃分過
        visited += rdd//新增到集合
        val locs = getCacheLocs(rdd)
        for (p <- 0 until rdd.splits.size) {//遍歷分割槽
          if (locs(p) == Nil) {
            for (dep <- rdd.dependencies) {//遍歷依賴
              dep match {
                case shufDep: ShuffleDependency[_,_,_] =>//shuffle依賴
                  val stage = getShuffleMapStage(shufDep)//獲取父stage
                  if (!stage.isAvailable) {
        //isAvailable判斷為真的條件是:無父stage並且無shuffle依賴
                    missing += stage//將stage加入為執行的父stage佇列
                  }
                case narrowDep: NarrowDependency[_] =>//窄依賴
                  visit(narrowDep.rdd)//父rdd繼續呼叫visit方法
              }
            }
          }
        }
      }
    }

    visit(stage.rdd)
    missing.toList
  }

 

接下來看getShuffleMapStage()方法,該方法是發生shuffle時獲取父stage,

shuffleToMapStage是HashMap儲存(shuffleId,stage)鍵值對,首先取key為該shuffleId的值

有值,直接返回取到的stage

無值,新建Stage並將該stage加入shuffleToMapStage中

def getShuffleMapStage(shuf: ShuffleDependency[_,_,_]): Stage = {
    shuffleToMapStage.get(shuf.shuffleId) match {
      case Some(stage) => stage
      case None =>
        val stage = newStage(shuf.rdd, Some(shuf))
        shuffleToMapStage(shuf.shuffleId) = stage
        stage
    }
  }