Spark2原理分析-DAGScheduler(Stage排程器)的實現原理
概述
本文介紹DAGScheduler的實現原理。通過文章《Spark2原理分析-DAGScheduler(Stage排程器)的基本概念》我們學習了DAGScheduler的基本概念,並瞭解了它的功能。
這篇文章,介紹DAGScheduler的具體實現。為避免篇幅過長,本文先介紹DAGScheduler的一些關鍵框架,和成員變數和函式。並介紹一些關鍵函式的實現。具體的各個框架的詳細實現,放到後面的文章進行講解。
DAGScheduler功能概況
我們先通過檢視該類的關鍵成員變數和成員函式的功能,可以大概瞭解一下該類實現了哪些功能。
DAGScheduler中的成員變數
變數名 | 型別 | 說明 |
---|---|---|
nextJobId | AtomicInteger | 用來生成Job的下一個id |
numTotalJobs | Int | 獲取目前的job總數:nextJobId.get() |
jobIdToStageIds | HashMap[Int, HashSet[Int]] | 通過jobid獲取對應的Stage的id集合 |
stageIdToStage | HashMap[Int, Stage] | 通過StageId獲取對應的stage集合 |
shuffleIdToMapStage | HashMap[Int, ShuffleMapStage] | shuffle的依賴id和ShuffleMapStage的對應關係的集合 |
jobIdToActiveJob | HashMap[Int, ActiveJob] | jobId對應的ActiveJob(正在執行的job)集合 |
waitingStages | HashSet[Stage] | 需要執行的Stage的集合 |
runningStages | HashSet[Stage] | 正在執行的Stage集合 |
failedStages | HashSet[Stage] | 由於執行失敗而必須要重新提交的Stage集合 |
cacheLocs | HashMap[Int, IndexedSeq[Seq[TaskLocation]]] | 每個分割槽和被快取的位置的對映集 |
failedEpoch | HashMap[String, Long] | 為跟蹤失敗的node |
outputCommitCoordinator | OutputCommitCoordinator | 對任務輸出到HDFS的動作進行認證 |
eventProcessLoop | DAGSchedulerEventProcessLoop | 事件處理框架物件 |
DAGSchedulerEventProcessLoop | 處理框架類的定義 |
DAGScheduler中的成員函式
函式名 | 返回值型別 | 說明 |
---|---|---|
getCacheLocs | IndexedSeq[Seq[TaskLocation]] | 獲取rdd分割槽快取的位置 |
getOrCreateShuffleMapStage | Int | 從shuffleIdToMapStage中獲取Stage,若沒有獲取到,則建立一個 |
createShuffleMapStage | ShuffleMapStage | 建立一個ShuffleMapStage,它用來產生所給shuffle依賴的分割槽 |
createResultStage | ResultStage | 建立一個和所給jobId關聯的ResultStage |
getOrCreateParentStages | Int | 為給定的rdd獲取或建立父stage的列表 |
getMissingAncestorShuffleDependencies | ArrayStack[ShuffleDependency[_, _, _]] | 查詢不在shuffleToMapStage結合中的祖先shuffle依賴 |
getShuffleDependencies | HashSet[ShuffleDependency[_, _, _]] | 返回給定rdd的shuffle的依賴 |
getMissingParentStages | List[Stage] | 返回缺失的Stage |
submitJob | JobWaiter[U] | 向scheduler提交一個由rdd的action操作產生的job |
runJob | Unit | 執行rdd的job,並把結果傳遞給resultHandler |
submitMapStage | 提交併獨立執行一個shuffle的map階段,返回一個JobWaiter物件 | |
handleJobSubmitted | 處理Job提交的事件 | |
handleMapStageSubmitted | 處理map的stage提交事件 | |
submitMissingTasks | 提交缺失的task,當父stage可用時該函式被呼叫 | |
handleTaskCompletion | 處理task完成的事件 |
DAGScheduler實體的建立和初始化
DAGScheduler的建立
DAGScheduler只在Spark driver中執行,它作為SparkContext初始化過程的一部分。建立實體的步驟如下:
- 首先建立任務排程器(taskScheduler)和schedulerBackend
- 再建立DAGScheduler實體
- 啟動任務排程器
通過下面的SparkContext的實現程式碼可以看到SparkContext是如何初始化DAGScheduler例項的:
class SparkContext(config: SparkConf) extends Logging {
...
// Create and start the scheduler
// 先建立任務排程器和schedulerBackend實體
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
// 再建立DAGScheduler實體
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
// 啟動任務排程器(後面會重點講到)
_taskScheduler.start()
...
}
DAGScheduler的初始化
DAGScheduler初始化的過程如下圖所示:
可以看到,SparkContext會傳遞一個引數this給DAGScheduler建構函式:
_dagScheduler = new DAGScheduler(this)
我們進入到DAGScheduler內部檢視一下具體的實現,DAGScheduler會呼叫以下建構函式進行構造:
def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
this(
sc,
taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env)
}
理解SparkContext傳遞給DAGScheduler建構函式的引數很重要,因為這是實現DAGScheduler的基礎。DAGScheduler從SparkContext中繼承的引數說明如下:
引數名 | 型別 | 說明 |
---|---|---|
sparkContext | SparkContext上下文 | |
taskScheduler | task排程器 | |
sc.listenerBus | 事件監聽匯流排 | |
MapOutputTrackerMaster | map輸出跟蹤 | |
BlockManagerMaster | 塊管理器 | |
SparkEnv | Spark環境資訊 |
DAGScheduler的事件處理框架
DAGScheduler事件處理物件的建立和啟動
在DAGScheduler類定義中,先構建事件處理框架物件。構建完成後,會呼叫start()函式來啟動事件處理執行緒,程式碼實現如下:
class DAGScheduler(...) {
...
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
...
eventProcessLoop.start()
}
事件處理框架的實現
上圖示識了DAGScheduler的事件處理框架。在發生事件時DAGScheduler會把事件儲存到一個阻塞的事件佇列中,此時會有一個處理執行緒在佇列的另一端監聽,若發現有事件放到佇列,立即得到通知,並把該事件取出,並呼叫DAGScheduler物件中的事件處理函式進行處理。
DAGScheduler中能夠處理的事件,和對應的處理函式如上圖所示。
shuffleIdToMapStage成員變數
該成員變數是一個HashMap,它表示:從shuffle依賴項ID(shuffle dependency ID)到為該依賴項生成資料的ShuffleMapStage的對映。
shuffle dependency ID -> ShuffleMapStage
僅包括當前正在執行的作業的一部分的階段(當shuffle階段的作業完成時,對映將被刪除,並且shuffle資料的唯一記錄將在MapOutputTracker中)。
該成員變數的定義如下:
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
getOrCreateShuffleMapStage
這是DAGScheduler的一個成員函式,它的原型如下:
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage
該函式的功能是:為ShuffleDependency查詢或建立ShuffleMapStage。
該函式先在shuffleIdToMapStage中查詢ShuffleMapStage,若找到了則返回,若找不到則建立一個。
getShuffleDependencies
該函式查詢給定RDD的直接父shuffle依賴項。
但該函式不會返回更遠的祖先。 例如,如果C對B有一個shuffle依賴,B對A有一個shuffle依賴,如下:
A<---B<---C
使用rdd C作為引數呼叫此函式,只會返回B <- C依賴項。
該函式的原型如下:
private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]]
當DAGScheduler找到或建立缺少的直接父ShuffleMapStages(對於給定RDD的ShuffleDependencies)並且找到給定RDD的所有缺失的shuffle依賴性時,使用getShuffleDependencies。
getMissingParentStages函式
該函式的宣告如下:
private def getMissingParentStages(stage: Stage): List[Stage]
主要功能是:在輸入階段的依賴關係圖中找到缺少的父ShuffleMapStages(使用廣度優先搜尋演算法),並按List返回。
該函式的實現流程如下:
- (1)從階段的RDD開始,並向上遍歷所有父RDD的樹以查詢未快取的分割槽。
- (2)遍歷RDD的父依賴項,並根據其型別進行操作,例如:ShuffleDependency或NarrowDependency。
- (3)對於每個NarrowDependency,getMissingParentStages只是標記要訪問的相應RDD,然後轉到RDD的下一個依賴項或在另一個未訪問的父RDD上工作。
- (4)對於每個ShuffleDependency,getMissingParentStages都會找到ShuffleMapStage階段。如果ShuffleMapStage不可用,則將其新增到缺失(對映)階段的集合中。
submitJob函式:提交action任務
該函式向scheduler提交Action的job。具體來說,該函式會:建立一個JobWaiter實體,併發送JobSubmitted型別的事件。
該函式的實現邏輯如下:
(1) 獲取引數rdd的分割槽長度,並檢查這些分割槽是否選在引數partitions的集合中。
(2) 增加nextJobId內部工作計數器。
(3) 若分割槽集合引數partitions的長度為0,說明該job執行0個任務,直接返回總任務數為0的JobWaiter物件。
(4) 若分割槽集合不為0,獲取每個分割槽的處理函式的實體
(5) 建立一個JobWaiter物件,並向DAGSchedulerEventProcessLoop發起一個JobSubmitted事件。
當SparkContext提交一個job,且DAGScheduler執行job時,才會呼叫submitJob函式。
處理的總流程圖如下所示:
函式宣告:
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {...}
submitMissingTasks函式
該函式的主要功能是:在Spark Job中提交Stage缺失的任務。該函式的原型如下:
private def submitMissingTasks(stage: Stage, jobId: Int)
該函式的實現邏輯如下:
-
(1)查詢stage的分割槽中缺失的分割槽的id
-
(2)標記stage的狀態為running
-
(3)通知outputCommitCoordinator該stage啟動了
-
(4)獲取缺失分割槽的最合適的位置,若此時沒有致命錯誤的異常,會嘗試建立一個新的階段。
-
(5)在LiveListenerBus上釋出SparkListenerStageSubmitted訊息。
-
(6) 根據階段的型別和ShuffleDependency(對於ShuffleMapStage)或函式(對於ResultStage)對RDD進行序列化,用來向executors分發任務。
-
(7) 若發生異,中止該階段(原因是“任務序列化失敗”,然後是異常)並從階段的內部runningStages集合中刪除該階段。submitMissingTasks退出。
-
(8) 分別為任務的每個缺失分割槽建立一個ShuffleMapTask或ResultTask,分別為ShuffleMapStage或ResultStage。submitMissingTasks使用每個分割槽的首選位置(前面已計算完成)。
-
(9) 記錄stage的pendingPartitions屬性中的(任務的)分割槽。
-
(10) 將任務提交給TaskScheduler執行(具有stage的id,嘗試id,輸入jobId以及帶jobId的ActiveJob的屬性)。
-
(11) 記錄Stage的StageInfo中的提交時間並退出。
-
(12)最後,由於沒有任務要提交執行,submitMissingTasks會提交等待子階段以供執行和退出。
JobWaiter
該物件用來等待DAGScheduler的job完成。當任務完成時,該物件會把結果傳輸給給定的處理函式。
總結
本文介紹了DAGScheduler類的實現,包括物件初始化,事件處理框架,job的提交流程等。但為了不讓整個篇幅過長,每個事件的具體處理的實現會放到後面的文章進行分析。