spark2原理分析-Job物件的實現原理
概述
本節介紹Spark Job物件的實現原理。
Job的基本概念
在《spark2原理分析-Job執行框架概述》一文中介紹了Job的執行框架的基本概念,本節詳細介紹其中的Job物件的實現原理。
上面的文章已經介紹過了,Job是由於執行Action操作函式產生的。Job是提交給 DAGScheduler(Job排程器)來計算action操作結果的頂層工作單元(計算單元)。在Spark程式碼實現層面,Job的實現類是ActiveJob。Job可以有兩種型別:
-
result Job(結果Job)
此類Job通過計算並建立一個ResultStage實體來執行action操作。 -
map-stage Job(對映階段Job)
此類Job會在任何下游的Stage提交之前,為ShuffleMapStage計算map操作的輸出。此類Job用於自適應查詢計劃,在提交後續階段之前檢視map操作的輸出統計資訊。
Spark使用ActiveJob類的finalStage欄位來區分這兩種型別的Job。
ActiveJob
該類代表一個在DAGScheduler執行的Job實體。該類的實現如下:
private[spark] class ActiveJob( val jobId: Int, val finalStage: Stage, val callSite: CallSite, val listener: JobListener, val properties: Properties) { val numPartitions = finalStage match { case r: ResultStage => r.partitions.length case m: ShuffleMapStage => m.rdd.partitions.length } /** Which partitions of the stage have finished */ val finished = Array.fill[Boolean](numPartitions)(false) var numFinished = 0 }
該類的實現程式碼很簡潔,下面介紹一個該類的各個成員的意義:
成員名 | 說明 |
---|---|
jobId | 獨一無二的Job的Id |
finalStage | 該Job計算的Stage,可以是:執行action的ResultStage;或submitMapStage的ShuffleMapStage。 |
callSite | CallSite型別的實體,使用者應用程式在這裡進行Job的初始化 |
listener | 監聽Job結束或Job失敗的事件 |
properties | 與Job排程相關的屬性 |
numPartitions | 需要為該Job計算的分割槽數量 |
finished | 是一個Boolean的陣列,用來表示在該Job中計算完成的分割槽 |
numFinished | 在該Job中已經計算完成的分割槽數量 |
Job的計算
計算Job等同於計算正在執行操作的RDD的分割槽(Partition)。Job中的分割槽數取決於階段Stage的型別 - ResultStage或ShuffleMapStage。
作業Job以單個目標的RDD開始,但最終可以包括其他RDD,這些RDD都是目標RDD的血緣關係圖中(lineage graph)的一部分。
父階段是ShuffleMapStage的例項。
但要注意,在actions操作中並不總是為ResultStages計算所有分割槽,比如:first()和lookup()之類的操作。
Job會跟蹤已經計算了多少分割槽,使用以下變數:
val finished = Array.fill[Boolean](numPartitions)(false)
Map-stage job
此類Job會在提交任何下游Stage之前,為ShuffleMapStage(對於submitMapStage)計算map的輸出檔案。
此類Job還用於自適應查詢計劃/自適應排程,以在提交後續階段之前檢視map輸出的統計資訊。
Result job
計算ResultStage,來執行action操作函式。
總結
本節介紹了Job的實現類,並對其實現原理進行了分析。