1. 程式人生 > >[Spark原始碼解析]DAGScheduler劃分stage

[Spark原始碼解析]DAGScheduler劃分stage

#[Spark原始碼解析]DAGScheduler劃分stage
在 Spark 裡每一個操作生成一個 RDD,RDD 之間連一條邊,最後這些 RDD 和他們之間的邊組成一個有向無環圖,這個就是 DAG,Spark 核心會在需要計算髮生的時刻繪製一張關於計算路徑的有向無環圖,也就是 DAG。有了DAG 圖,Spark 核心下一步的任務就是根據 DAG 圖將計算劃分成 Stage,

在這裡插入圖片描述

上圖:G 與 F 之間是寬依賴,所以把 G 和 F 分為兩個 Stage,而 C 、D 到 F,E 到 F 都是窄依賴,所以 CDEF 最終劃分為一個 Stage2,A 與 B 之間是寬依賴,B 與 G 之間是窄依賴,所以最終,A 被劃分為一個 Stage1,因為 BG 的 stage 依賴於 stage1 和 stage2,所以最終把整個DAG 劃分為一個 stage3,所以說,寬窄依賴的作用就是切割 job,劃分 stage。

Stage:由一組可以平行計算的 task 組成。

##注意:

1,DAGScheduler將Job分解成具有前後依賴關係的多個stage

2,DAGScheduler是根據ShuffleDependency(寬依賴)劃分stage的,

3,stage分為ShuffleMapStage和ResultStage;一個Job中包含一個ResultStage及多個ShuffleMapStage

4,一個stage包含多個tasks,task的個數即該stage的finalRDD的partition數,

5,一個stage中的task完全相同,ShuffleMapStage包含的都是ShuffleMapTask;ResultStage包含的都是ResultTask

##概述:

Spark Application只有遇到action操作時才會真正的提交任務並進行計算,DAGScheduler 會根據各個RDD之間的依賴關係形成一個DAG,並根據ShuffleDependency來進行stage的劃分,stage包含多個tasks,個數由該stage的finalRDD決定,stage裡面的task完全相同,DAGScheduler 完成stage的劃分後基於每個Stage生成TaskSet,並提交給TaskScheduler,TaskScheduler負責具體的task的排程,在Worker節點上啟動task。
在這裡插入圖片描述

Job的提交

以count為例,直接看原始碼都有哪些步驟:

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
    DAGScheduler#runJob
        DAGScheduler#runJob
            DAGScheduler#runJob
                DAGScheduler#dagScheduler.runJob
                    DAGScheduler#submitJob
                        eventProcessLoop.post(JobSubmitted(**))

eventProcessLoop是一個DAGSchedulerEventProcessLoop(this)物件,可以把DAGSchedulerEventProcessLoop理解成DAGScheduler的對外的功能介面。它對外隱藏了自己內部實現的細節。無論是內部還是外部訊息,DAGScheduler可以共用同一訊息處理程式碼,邏輯清晰,處理方式統一。 eventProcessLoop接收各種訊息並進行處理,處理的邏輯在其doOnReceive方法中:

 private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

    ......
}

當提交的是JobSubmitted,便會通過 dagScheduler.handleJobSubmitted處理此事件。

Stage的劃分

在handleJobSubmitted方法中第一件事情就是通過finalRDD向前追溯對Stage的劃分。

private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
  var finalStage: ResultStage = null
  try { 
 //Stage劃分過程是從最後一個Stage開始往前執行的,最後一個Stage的型別是ResultStage
    finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
  } catch {
    case e: Exception =>
      logWarning("Creating new stage failed due to exception - job: " + jobId, e)
      listener.jobFailed(e)
      return
  }
  //為此job生成一個ActiveJob物件
  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
  clearCacheLocs()
  logInfo("Got job %s (%s) with %d output partitions".format(
    job.jobId, callSite.shortForm, partitions.length))
  logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
  logInfo("Parents of final stage: " + finalStage.parents)
  logInfo("Missing parents: " + getMissingParentStages(finalStage))

  val jobSubmissionTime = clock.getTimeMillis()
  jobIdToActiveJob(jobId) = job //記錄該job處於active狀態
  activeJobs += job 
  finalStage.setActiveJob(job)
  val stageIds = jobIdToStageIds(jobId).toArray
  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
  listenerBus.post( //向LiveListenerBus傳送Job提交事件
    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
  submitStage(finalStage) //提交Stage

  submitWaitingStages()
}

跟進newResultStage方法:

private def newResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId) //獲取stage的parentstage
    val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
    stageIdToStage(id) = stage //將Stage和stage_id關聯
    updateJobIdStageIdMaps(jobId, stage) //跟新job所包含的stage
    stage
  }

直接例項化一個ResultStage,但需要parentStages作為引數,我們看看getParentStagesAndId做了什麼:

private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
    val parentStages = getParentStages(rdd, firstJobId)
    val id = nextStageId.getAndIncrement()
    (parentStages, id)
  }

獲取parentStages,並返回一個與stage關聯的唯一id,由於是遞迴的向前生成stage,所以最先生成的stage是最前面的stage,越往前的stageId就越小,即父Stage的id最小。繼續跟進getParentStages:

private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    val parents = new HashSet[Stage] // 當前Stage的所有parent Stage
    val visited = new HashSet[RDD[_]] // 已經訪問過的RDD
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]] //等待訪問的RDD
    def visit(r: RDD[_]) {
      if (!visited(r)) { //若未訪問過
        visited += r  //標記已被訪問
        // Kind of ugly: need to register RDDs with the cache here since
        // we can't do it in its constructor because # of partitions is unknown
        for (dep <- r.dependencies) { //遍歷其所有依賴
          dep match {
            case shufDep: ShuffleDependency[_, _, _] => //若為寬依賴,則生成新的Stage,shuffleMapstage
              parents += getShuffleMapStage(shufDep, firstJobId)
            case _ => //若為窄依賴(歸為當前Stage),壓入棧,繼續向前迴圈,直到遇到寬依賴或者無依賴
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }
    waitingForVisit.push(rdd) //將當前rdd壓入棧
    while (waitingForVisit.nonEmpty) { //等待訪問的rdd不為空時繼續訪問
      visit(waitingForVisit.pop())
    }
    parents.toList
  }

通過給定的RDD返回其依賴的Stage集合。通過RDD每一個依賴進行遍歷,遇到窄依賴就繼續往前遍歷,遇到ShuffleDependency便通過getShuffleMapStage返回一個ShuffleMapStage物件新增到父Stage列表中。可見,這裡的parentStage是Stage直接依賴的父stages(parentStage也有自己的parentStage),而不是整個DAG的所有stages。繼續跟進getShuffleMapStage的實現:

private def getShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    shuffleToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) => stage //若已經在shuffleToMapStage存在直接返回Stage
      case None => //不存在需要生成新的Stage
        //為當前shuffle的父shuffle都生成一個ShuffleMapStage
       getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          if (!shuffleToMapStage.contains(dep.shuffleId)) {
            shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) //跟新shuffleToMapStage對映
          }
        }
        // 為當前shuffle生成新的Stage
        val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
        shuffleToMapStage(shuffleDep.shuffleId) = stage
        stage
    }
  }

先從shuffleToMapStage根據shuffleid獲取Stage,若未獲取到再去計算,第一次都肯定為None,我們先看getAncestorShuffleDependencies幹了什麼:

 private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
    val parents = new Stack[ShuffleDependency[_, _, _]] // 當前shuffleDependency所有的祖先ShuffleDependency(不是直接ShuffleDependency)
    val visited = new HashSet[RDD[_]] // 已經被訪問過的RDD
    // 等待被訪問的RDD
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) { //未被訪問過
        visited += r //標記已被訪問
        for (dep <- r.dependencies) { //遍歷直接依賴
          dep match {
            case shufDep: ShuffleDependency[_, _, _] => 
              if (!shuffleToMapStage.contains(shufDep.shuffleId)) { // 若為shuffleDependency並且還沒有對映,則新增到parents 
                parents.push(shufDep)
              }
            case _ =>
          }
          waitingForVisit.push(dep.rdd)  //即使是shuffleDependency的rdd也要繼續遍歷
        }
      }
    }

    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    parents
  }

貌似和getParentStages方法很像,區別是這裡獲取的所有祖先ShuffleDependency,而不是直接父ShuffleDependency。

為當前shuffle的父shuffle都生成一個ShuffleMapStage後再通過newOrUsedShuffleStage獲取當前依賴的shuffleStage,再和shuffleid關聯起來,看newOrUsedShuffleStage的實現:

private def newOrUsedShuffleStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd //依賴對應的rdd
    val numTasks = rdd.partitions.length //分割槽個數
    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite) //返回當前rdd的shufflestage
    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    //如果當前shuffle已經在MapOutputTracker中註冊過,也就是Stage已經被計算過,從MapOutputTracker中獲取計算結果
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      (0 until locs.length).foreach { i => // 更新Shuffle的Shuffle Write路徑
        if (locs(i) ne null) {
          // locs(i) will be null if missing
          stage.addOutputLoc(i, locs(i))
        }
      }
    } else { //還沒有被註冊計算過
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)  //註冊
    }
    stage
  }

繼續看newShuffleMapStage:

private def newShuffleMapStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int,
      callSite: CallSite): ShuffleMapStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId) //獲取parentstages即stageid
    val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
      firstJobId, callSite, shuffleDep) //例項化一個shuffleStage物件

    stageIdToStage(id) = stage //Stage和id關聯
    updateJobIdStageIdMaps(firstJobId, stage) //跟新job所有的Stage
    stage
  }

怎麼和newResultStage極其的相似?是的沒錯,這裡會生成ShuffleStage,getParentStagesAndId裡面的實現就是一個遞迴呼叫。

由finalRDD往前追溯遞迴生成Stage,最前面的ShuffleStage先生成,最終生成ResultStage,至此,DAGScheduler對Stage的劃分已經完成。
本文參照UFO和牛肉圓粉不加蔥發表的文章