1. 程式人生 > >spark2原理分析-Job物件的實現原理

spark2原理分析-Job物件的實現原理

概述

本節介紹Spark Job物件的實現原理。

Job的基本概念

《spark2原理分析-Job執行框架概述》一文中介紹了Job的執行框架的基本概念,本節詳細介紹其中的Job物件的實現原理。

在這裡插入圖片描述

上面的文章已經介紹過了,Job是由於執行Action操作函式產生的。Job是提交給 DAGScheduler(Job排程器)來計算action操作結果的頂層工作單元(計算單元)。在Spark程式碼實現層面,Job的實現類是ActiveJob。Job可以有兩種型別:

  • result Job(結果Job)

    此類Job通過計算並建立一個ResultStage實體來執行action操作。

  • map-stage Job(對映階段Job)

    此類Job會在任何下游的Stage提交之前,為ShuffleMapStage計算map操作的輸出。此類Job用於自適應查詢計劃,在提交後續階段之前檢視map操作的輸出統計資訊。

Spark使用ActiveJob類的finalStage欄位來區分這兩種型別的Job。

ActiveJob

該類代表一個在DAGScheduler執行的Job實體。該類的實現如下:

private[spark] class ActiveJob(
    val jobId: Int,
    val finalStage: Stage,
    val callSite: CallSite,
    val listener: JobListener,
    val properties: Properties) {

  val numPartitions = finalStage match {
    case r: ResultStage => r.partitions.length
    case m: ShuffleMapStage => m.rdd.partitions.length
  }

  /** Which partitions of the stage have finished */
  val finished = Array.fill[Boolean](numPartitions)(false)

  var numFinished = 0
}

該類的實現程式碼很簡潔,下面介紹一個該類的各個成員的意義:

成員名 說明
jobId 獨一無二的Job的Id
finalStage 該Job計算的Stage,可以是:執行action的ResultStage;或submitMapStage的ShuffleMapStage。
callSite CallSite型別的實體,使用者應用程式在這裡進行Job的初始化
listener 監聽Job結束或Job失敗的事件
properties 與Job排程相關的屬性
numPartitions 需要為該Job計算的分割槽數量
finished 是一個Boolean的陣列,用來表示在該Job中計算完成的分割槽
numFinished 在該Job中已經計算完成的分割槽數量

Job的計算

計算Job等同於計算正在執行操作的RDD的分割槽(Partition)。Job中的分割槽數取決於階段Stage的型別 - ResultStage或ShuffleMapStage。

作業Job以單個目標的RDD開始,但最終可以包括其他RDD,這些RDD都是目標RDD的血緣關係圖中(lineage graph)的一部分。

父階段是ShuffleMapStage的例項。

但要注意,在actions操作中並不總是為ResultStages計算所有分割槽,比如:first()和lookup()之類的操作。

Job會跟蹤已經計算了多少分割槽,使用以下變數:

val finished = Array.fill[Boolean](numPartitions)(false)

在這裡插入圖片描述

Map-stage job

此類Job會在提交任何下游Stage之前,為ShuffleMapStage(對於submitMapStage)計算map的輸出檔案。

此類Job還用於自適應查詢計劃/自適應排程,以在提交後續階段之前檢視map輸出的統計資訊。

Result job

計算ResultStage,來執行action操作函式。

總結

本節介紹了Job的實現類,並對其實現原理進行了分析。