1. 程式人生 > >spark中job stage task關係

spark中job stage task關係

1.1 例子,美國 1880 - 2014 年新生嬰兒資料統計

  • 目標:用美國 1880 - 2014 年新生嬰兒的資料來做做簡單的統計
  • 資料格式:
    • 每年的新生嬰兒資料在一個檔案裡面
    • 每個檔案的每一條資料格式:姓名,性別,新生人數

1.2 執行流程概覽

上面的 22 行程式碼,就已經把構建一個 spark app 的三大步驟完成了,amazing, right? 今天我們主要講 spark 的執行邏輯,所以我們就以核心的 11 - 16 ,這六行程式碼來作為今天的主線,瞭解瞭解 spark 的原理。

 

可以看到,整個邏輯實際上就用了 sparkContext 的一個函式,rdd 的 3 個 transformation 和 1 個 action。

 

現在讓我們從 WEB UI 上來看看,當我們執行這段程式碼的時候,後臺都發生了什麼。 可以看到,執行這段程式碼的時候,spark 通過分析,優化程式碼,知道這段程式碼需要一個 job 來完成,所以 web ui 上只有一個 job。值得深究的是,這個 job 由兩個 stage 完成,這兩個 state 一共有 66 個 task。

所以,這裡我們就再次理解下 spark 裡,job,stage,task 的概念:

  • job : A job is triggered by an action, like count() or saveAsTextFile(). Click on a job to see information about the stages of tasks inside it. 理解了嗎,所謂一個 job,就是由一個 rdd 的 action 觸發的動作,可以簡單的理解為,當你需要執行一個 rdd 的 action 的時候,會生成一個 job。
  • stage : stage 是一個 job 的組成單位,就是說,一個 job 會被切分成 1 個或 1 個以上的 stage,然後各個 stage 會按照執行順序依次執行。
  • task : A unit of work within a stage, corresponding to one RDD partition。即 stage 下的一個任務執行單元,一般來說,一個 rdd 有多少個 partition,就會有多少個 task,因為每一個 task 只是處理一個 partition 上的資料。從 web ui 截圖上我們可以看到,這個 job 一共有 2 個 stage,66 個 task,平均下來每個 stage 有 33 個 task,相當於每個 stage 的資料都有 33 個 partition [注意:這裡是平均下來的哦,並不都是每個 stage 有 33 個 task,有時候也會有一個 stage 多,另外一個 stage 少的情況,就看你有沒有在不同的 stage 進行 repartition 類似的操作了。

1.3 執行流程之 : job

根據上面的截圖和再次重溫,我們知道這個 spark 應用裡只有一個 job,那就是因為我們執行了一個 collect 操作,即把處理後的資料全部返回到我們的 driver 上,進行後續的畫圖,返回的資料如下圖:

 

1.4 執行流程之 : stage

我們這個 spark 應用,生成了一個 job,這個 job 由 2 個 stage 組成,並且每個 stage 都有 33 個task,說明每個 stage 的資料都在 33 個 partition 上,這下我們就來看看,這兩個 stage 的情況。

stage的劃分是以shuffle操作作為邊界的。也就是說某個action導致了shuffle,就會劃分出兩個stage

 

再次回顧上面那張圖:這下應該就明瞭了,關於兩個 stage 的情況:

  • 第一個 stage,即截圖中 stage id 為 0 的 stage,其執行了sc.wholeTextFiles().map().flatMap().map().reduceByKey() 這幾個步驟,因為這是一個 Shuffle 操作,所以後面會有 Shuffle Read 和 Shuffle Write。具體來說,就是在 stage 0 這個 stage 中,發生了一個 Shuffle 操作,這個操作讀入 22.5 MB 的資料,生成 41.7 KB 的資料,並把生成的資料寫在了硬碟上。
  • 第二個 stage,即截圖中 stage id 為 1 到 stage,其執行了 collect() 這個操作,因為這是一個 action 操作,並且它上一步是一個 Shuffle 操作,且沒有後續操作,所以這裡 collect() 這個操作被獨立成一個 stage 了。這裡它把上一個 Shuffle 寫下的資料讀取進來,然後一起返回到 driver 端,所以這裡可以看到他的 Shuffle Read 這裡剛好讀取了上一個 stage 寫下的資料。