1. 程式人生 > >Spark2原理分析-DAGScheduler(Stage排程器)的實現原理

Spark2原理分析-DAGScheduler(Stage排程器)的實現原理

概述

本文介紹DAGScheduler的實現原理。通過文章《Spark2原理分析-DAGScheduler(Stage排程器)的基本概念》我們學習了DAGScheduler的基本概念,並瞭解了它的功能。

這篇文章,介紹DAGScheduler的具體實現。為避免篇幅過長,本文先介紹DAGScheduler的一些關鍵框架,和成員變數和函式。並介紹一些關鍵函式的實現。具體的各個框架的詳細實現,放到後面的文章進行講解。

DAGScheduler功能概況

我們先通過檢視該類的關鍵成員變數和成員函式的功能,可以大概瞭解一下該類實現了哪些功能。

DAGScheduler中的成員變數

變數名 型別 說明
nextJobId AtomicInteger 用來生成Job的下一個id
numTotalJobs Int 獲取目前的job總數:nextJobId.get()
jobIdToStageIds HashMap[Int, HashSet[Int]] 通過jobid獲取對應的Stage的id集合
stageIdToStage HashMap[Int, Stage] 通過StageId獲取對應的stage集合
shuffleIdToMapStage HashMap[Int, ShuffleMapStage] shuffle的依賴id和ShuffleMapStage的對應關係的集合
jobIdToActiveJob HashMap[Int, ActiveJob] jobId對應的ActiveJob(正在執行的job)集合
waitingStages HashSet[Stage] 需要執行的Stage的集合
runningStages HashSet[Stage] 正在執行的Stage集合
failedStages HashSet[Stage] 由於執行失敗而必須要重新提交的Stage集合
cacheLocs HashMap[Int, IndexedSeq[Seq[TaskLocation]]] 每個分割槽和被快取的位置的對映集
failedEpoch HashMap[String, Long] 為跟蹤失敗的node
outputCommitCoordinator OutputCommitCoordinator 對任務輸出到HDFS的動作進行認證
eventProcessLoop DAGSchedulerEventProcessLoop 事件處理框架物件
DAGSchedulerEventProcessLoop 處理框架類的定義

DAGScheduler中的成員函式

函式名 返回值型別 說明
getCacheLocs IndexedSeq[Seq[TaskLocation]] 獲取rdd分割槽快取的位置
getOrCreateShuffleMapStage Int 從shuffleIdToMapStage中獲取Stage,若沒有獲取到,則建立一個
createShuffleMapStage ShuffleMapStage 建立一個ShuffleMapStage,它用來產生所給shuffle依賴的分割槽
createResultStage ResultStage 建立一個和所給jobId關聯的ResultStage
getOrCreateParentStages Int 為給定的rdd獲取或建立父stage的列表
getMissingAncestorShuffleDependencies ArrayStack[ShuffleDependency[_, _, _]] 查詢不在shuffleToMapStage結合中的祖先shuffle依賴
getShuffleDependencies HashSet[ShuffleDependency[_, _, _]] 返回給定rdd的shuffle的依賴
getMissingParentStages List[Stage] 返回缺失的Stage
submitJob JobWaiter[U] 向scheduler提交一個由rdd的action操作產生的job
runJob Unit 執行rdd的job,並把結果傳遞給resultHandler
submitMapStage 提交併獨立執行一個shuffle的map階段,返回一個JobWaiter物件
handleJobSubmitted 處理Job提交的事件
handleMapStageSubmitted 處理map的stage提交事件
submitMissingTasks 提交缺失的task,當父stage可用時該函式被呼叫
handleTaskCompletion 處理task完成的事件

DAGScheduler實體的建立和初始化

DAGScheduler的建立

DAGScheduler只在Spark driver中執行,它作為SparkContext初始化過程的一部分。建立實體的步驟如下:

  1. 首先建立任務排程器(taskScheduler)和schedulerBackend
  2. 再建立DAGScheduler實體
  3. 啟動任務排程器

通過下面的SparkContext的實現程式碼可以看到SparkContext是如何初始化DAGScheduler例項的:

class SparkContext(config: SparkConf) extends Logging {
    ...
    // Create and start the scheduler
    // 先建立任務排程器和schedulerBackend實體
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    // 再建立DAGScheduler實體
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    // 啟動任務排程器(後面會重點講到)
    _taskScheduler.start()
    
    ...
}

DAGScheduler的初始化

DAGScheduler初始化的過程如下圖所示:

可以看到,SparkContext會傳遞一個引數this給DAGScheduler建構函式:

    _dagScheduler = new DAGScheduler(this)

我們進入到DAGScheduler內部檢視一下具體的實現,DAGScheduler會呼叫以下建構函式進行構造:

  def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
    this(
      sc,
      taskScheduler,
      sc.listenerBus,
      sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
      sc.env.blockManager.master,
      sc.env)
  }

理解SparkContext傳遞給DAGScheduler建構函式的引數很重要,因為這是實現DAGScheduler的基礎。DAGScheduler從SparkContext中繼承的引數說明如下:

引數名 型別 說明
sparkContext SparkContext上下文
taskScheduler task排程器
sc.listenerBus 事件監聽匯流排
MapOutputTrackerMaster map輸出跟蹤
BlockManagerMaster 塊管理器
SparkEnv Spark環境資訊

DAGScheduler的事件處理框架

DAGScheduler事件處理物件的建立和啟動

在DAGScheduler類定義中,先構建事件處理框架物件。構建完成後,會呼叫start()函式來啟動事件處理執行緒,程式碼實現如下:

class DAGScheduler(...) {
  ...
  private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
  ...
  eventProcessLoop.start()
}

事件處理框架的實現

上圖示識了DAGScheduler的事件處理框架。在發生事件時DAGScheduler會把事件儲存到一個阻塞的事件佇列中,此時會有一個處理執行緒在佇列的另一端監聽,若發現有事件放到佇列,立即得到通知,並把該事件取出,並呼叫DAGScheduler物件中的事件處理函式進行處理。

DAGScheduler中能夠處理的事件,和對應的處理函式如上圖所示。

shuffleIdToMapStage成員變數

該成員變數是一個HashMap,它表示:從shuffle依賴項ID(shuffle dependency ID)到為該依賴項生成資料的ShuffleMapStage的對映。

shuffle dependency ID -> ShuffleMapStage

僅包括當前正在執行的作業的一部分的階段(當shuffle階段的作業完成時,對映將被刪除,並且shuffle資料的唯一記錄將在MapOutputTracker中)。

該成員變數的定義如下:

private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]

getOrCreateShuffleMapStage

這是DAGScheduler的一個成員函式,它的原型如下:

 private def getOrCreateShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage 

該函式的功能是:為ShuffleDependency查詢或建立ShuffleMapStage

該函式先在shuffleIdToMapStage中查詢ShuffleMapStage,若找到了則返回,若找不到則建立一個。

getShuffleDependencies

該函式查詢給定RDD的直接父shuffle依賴項。

但該函式不會返回更遠的祖先。 例如,如果C對B有一個shuffle依賴,B對A有一個shuffle依賴,如下:

A<---B<---C

使用rdd C作為引數呼叫此函式,只會返回B <- C依賴項。

該函式的原型如下:

private[scheduler] def getShuffleDependencies(
      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]]

當DAGScheduler找到或建立缺少的直接父ShuffleMapStages(對於給定RDD的ShuffleDependencies)並且找到給定RDD的所有缺失的shuffle依賴性時,使用getShuffleDependencies。

getMissingParentStages函式

該函式的宣告如下:

private def getMissingParentStages(stage: Stage): List[Stage]

主要功能是:在輸入階段的依賴關係圖中找到缺少的父ShuffleMapStages(使用廣度優先搜尋演算法),並按List返回。

該函式的實現流程如下:

  • (1)從階段的RDD開始,並向上遍歷所有父RDD的樹以查詢未快取的分割槽。
  • (2)遍歷RDD的父依賴項,並根據其型別進行操作,例如:ShuffleDependency或NarrowDependency。
  • (3)對於每個NarrowDependency,getMissingParentStages只是標記要訪問的相應RDD,然後轉到RDD的下一個依賴項或在另一個未訪問的父RDD上工作。
  • (4)對於每個ShuffleDependency,getMissingParentStages都會找到ShuffleMapStage階段。如果ShuffleMapStage不可用,則將其新增到缺失(對映)階段的集合中。

submitJob函式:提交action任務

該函式向scheduler提交Action的job。具體來說,該函式會:建立一個JobWaiter實體,併發送JobSubmitted型別的事件。

該函式的實現邏輯如下:

(1) 獲取引數rdd的分割槽長度,並檢查這些分割槽是否選在引數partitions的集合中。

(2) 增加nextJobId內部工作計數器。
(3) 若分割槽集合引數partitions的長度為0,說明該job執行0個任務,直接返回總任務數為0的JobWaiter物件。
(4) 若分割槽集合不為0,獲取每個分割槽的處理函式的實體
(5) 建立一個JobWaiter物件,並向DAGSchedulerEventProcessLoop發起一個JobSubmitted事件。

當SparkContext提交一個job,且DAGScheduler執行job時,才會呼叫submitJob函式。

處理的總流程圖如下所示:

函式宣告:

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {...}

submitMissingTasks函式

該函式的主要功能是:在Spark Job中提交Stage缺失的任務。該函式的原型如下:

private def submitMissingTasks(stage: Stage, jobId: Int)

該函式的實現邏輯如下:

  • (1)查詢stage的分割槽中缺失的分割槽的id

  • (2)標記stage的狀態為running

  • (3)通知outputCommitCoordinator該stage啟動了

  • (4)獲取缺失分割槽的最合適的位置,若此時沒有致命錯誤的異常,會嘗試建立一個新的階段。

  • (5)在LiveListenerBus上釋出SparkListenerStageSubmitted訊息。

  • (6) 根據階段的型別和ShuffleDependency(對於ShuffleMapStage)或函式(對於ResultStage)對RDD進行序列化,用來向executors分發任務。

  • (7) 若發生異,中止該階段(原因是“任務序列化失敗”,然後是異常)並從階段的內部runningStages集合中刪除該階段。submitMissingTasks退出。

  • (8) 分別為任務的每個缺失分割槽建立一個ShuffleMapTask或ResultTask,分別為ShuffleMapStage或ResultStage。submitMissingTasks使用每個分割槽的首選位置(前面已計算完成)。

  • (9) 記錄stage的pendingPartitions屬性中的(任務的)分割槽。

  • (10) 將任務提交給TaskScheduler執行(具有stage的id,嘗試id,輸入jobId以及帶jobId的ActiveJob的屬性)。

  • (11) 記錄Stage的StageInfo中的提交時間並退出。

  • (12)最後,由於沒有任務要提交執行,submitMissingTasks會提交等待子階段以供執行和退出。

JobWaiter

該物件用來等待DAGScheduler的job完成。當任務完成時,該物件會把結果傳輸給給定的處理函式。

總結

本文介紹了DAGScheduler類的實現,包括物件初始化,事件處理框架,job的提交流程等。但為了不讓整個篇幅過長,每個事件的具體處理的實現會放到後面的文章進行分析。