1. 程式人生 > >Spark排程機制:4)階段劃分

Spark排程機制:4)階段劃分

階段劃分是作業排程過程的關鍵所在,首先探討下Spark是如何進行階段劃分的。

一個階段劃分的例子如下圖所示,用虛線表示一個階段,虛線框內所有的RDD都是為了實現該階段而需要被計算的資料。整個作業最後一個RDD的所有分割槽資料被計算完畢對於的階段就是所求的末階段。

沿著RDD的依賴關係往前進行深度優先遍歷,若遇到一個Shuffle依賴,依賴的每一個父RDD所有分割槽資料都計算完畢可以分別對應一個階段,且都是當前階段的父階段,繼續沿著父RDD往前遍歷,若遇到一個窄依賴,則直接往前遍歷,直到當前所有的依賴關係都被遍歷才返回上一層,通過這個過程,最後會得到一張DAG。DAG的最終階段稱之為結果階段(Result Stage),其餘的階段稱為ShuffleMap階段。(簡單區分窄依賴和Shuffle依賴,看父RDD是否存在一個分割槽有大於1條線出去,若有則為Shuffle依賴)


以上圖為例,Stage3是結果階段,沿著RDD的依賴關係,從G向前遍歷。(明確一點Spark階段劃分是包含式的)

首先看A->B->G這條路徑,B->G是窄依賴,繼續向前到A->B是Shuffle依賴,B的父RDD-A所有分割槽資料被計算完成可以視為一個階段,所以RDD_A可以視為一個階段Stage1。

再看C->D->F->G這條路徑,F->G是shuffle依賴,G的父RDD-F所有分割槽資料被計算完成可以視為一個階段,也就是下面的整體。由於其他的路徑都是窄依賴,因此只有一個階段Stage2。

如上圖所示,將Shuffle依賴作為兩個階段的分割點,並記錄二者之間的階段依賴關係,這部分的功能在newResultStage

方法中實現

  private def newResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }
可以看到,newResultStage函式內部先呼叫getParentStagesAndId獲得父輩階段集合parentStages和階段唯一標識ID,parentStages中的每一個階段又儲存了與其父輩階段的關係
  private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
    val parentStages = getParentStages(rdd, firstJobId)
    val id = nextStageId.getAndIncrement()
    (parentStages, id)
  }
getParentStages是一個比較複雜的堆疊遞迴過程,對於每一個階段的父階段,都會將其封裝成一個Stage物件,並新增到parentStages中。換句話說,parentStages得到的實際上就是除了當前階段在內的DAG圖。
   //遞迴構建DAG圖,結果儲存在parents中
  private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    val parents = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        // Kind of ugly: need to register RDDs with the cache here since
        // we can't do it in its constructor because # of partitions is unknown
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              parents += getShuffleMapStage(shufDep, firstJobId)
            case _ =>
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    parents.toList
  }

至此,DAG排程已經完成了階段劃分的工作,並把任務集交付給任務排程器,具體可參看下一章節:Spark排程機制:5)任務排程