1. 程式人生 > >spark job提交過程

spark job提交過程

Driver的任務提交過程

這裡寫圖片描述
1、Driver程式的程式碼執行到action操作,觸發了SparkContext的runJob方法。
2、SparkContext呼叫DAGScheduler的runJob函式。
3、DAGScheduler把Job劃分stage,然後把stage轉化為相應的Tasks,把Tasks交給TaskScheduler。
4、通過TaskScheduler把Tasks新增到任務隊列當中,交給SchedulerBackend進行資源分配和任務排程。
5、排程器給Task分配執行Executor,ExecutorBackend負責執行Task。

Spark排程管理

本文主要介紹在單個任務內Spark的排程管理,Spark排程相關概念如下:

Task(任務):單個分割槽資料及上的最小處理流程單元。
TaskSet(任務集):由一組關聯的,但互相之間沒有Shuffle依賴關係的任務所組成的任務集。
Stage(排程階段):一個任務集對應的排程階段。
Job(作業):有一個RDD Action生成的一個或多個排程階段所組成的一次計算作業。
Application(應用程式):Spark應用程式,由一個或多個作業組成。
各概念間的邏輯關係如下圖所示:
這裡寫圖片描述
resource_manager

Spark的排程管理模組中,最重要的類是DAGScheduler和TaskScheduler,TaskScheduler負責每個具體任務的實際物理排程,DAGScheduler負責將作業拆分成不同階段的具有依賴關係的多批任務,可以理解為DAGScheduler負責任務的邏輯排程。Spark排程管理示意圖如下:
Spark排程管理示意圖

排程階段的拆分

一個Spark任務提交後,DAGScheduler從RDD依賴鏈末端的RDD出發,遍歷整個RDD依賴鏈,將Job分解成具有前後依賴關係的多個stage。DAGScheduler是根據ShuffleDependency劃分stage的,也就是說當某個RDD的運算需要將資料進行shuffle操作時,這個包含了shuffle依賴關係的RDD將被用來作為輸入資訊,構建一個新的排程階段。以此為依據劃分排程階段,可以確保有依賴關係的資料能夠按照正確的順序得到處理和運算。

如何進行Stage劃分?下圖給出的是對應Spark應用程式程式碼生成的Stage。它的具體劃分依據是根據RDD的依賴關係進行,在遇到寬依賴時將兩個RDD劃分為不同的Stage。

這裡寫圖片描述

從上圖中可以看到,RDD G與RDD F間的依賴是寬依賴,所以RDD F與 RDD G被劃分為不同的Stage,而RDD G 與 RDD 間為窄依賴,因此 RDD B 與 RDD G被劃分為同一個Stage。通過這種遞迴的呼叫方式,將所有RDD進行劃分。

Stage劃分演算法

由於Spark的運算元構建一般都是鏈式的,這就涉及了要如何進行這些鏈式計算,Spark的策略是對這些運算元,先劃分Stage,然後在進行計算。

由於資料是分散式的儲存在各個節點上的,所以為了減少網路傳輸的開銷,就必須最大化的追求資料本地性,所謂的資料本地性是指,在計算時,資料本身已經在記憶體中或者利用已有快取無需計算的方式獲取資料。

Stage劃分演算法思想

(1)一個Job由多個Stage構成

一個Job可以有一個或者多個Stage,Stage劃分的依據就是寬依賴,產生寬依賴的運算元:reduceByKey、groupByKey等等

(2)根據依賴關係,從前往後依次執行多個Stage

SparkApplication 中可以因為不同的Action觸發眾多的Job,也就是說一個Application中可以有很多的Job,每個Job是有一個或者多個Stage構成,後面的Stage依賴前面的Stage,也就是說只有前面的Stage計算完後,後面的Stage才會執行。

(3)Stage的執行時Lazy級別的

所有的Stage會形成一個DAG(有向無環圖),由於RDD的Lazy特性,導致Stage也是Lazy級別的,只有遇到了Action才會真正發生作業的執行,在Action之前,Spark框架只是將要進行的計算記錄下來,並沒有真的執行。

排程階段的提交

在劃分Stage的步驟中會得到一個或多個有依賴關係的Stage,其中直接觸發作業的RDD關聯的排程階段被稱為FinalStage,DAGScheduler從FinalStage開始生成一個Job。Job和Stage的關係儲存在一個對映表中,用於在該排程階段全部完成時做一些後續處理,如報告狀態、清理作業相關資料等。

具體提交一個Stage時,首先判斷其依賴的所有父Stage的結果是否可用。如果所有父Stage的結果都可用,則提交該Stage。如果有任何一個父Stage的結果不可用,則嘗試迭代提交當前不可用的父Stage。在迭代過程中,父Stage還未執行的Stage都被放到等待佇列中,等待將來被提交。

下圖是一個具有四個排程階段的Job的Stage提交順序:
這裡寫圖片描述

當一個屬於中間過程排程階段的任務(這種型別的任務所對應的類為ShuffleMapTask)完成後,DAGScheduler會檢查對應排程階段的所有任務是否都完成了。如果完成了,則DAGScheduler將重新掃描一次等待列表中所有的Stage,檢查它們是否還有依賴的Stage沒有完成。如果所有依賴的Stage都已執行完畢,則提交該Stage。

任務結果的獲取

根據任務結果的大小不同,ResultTask返回的結果分為兩中形式:

如果結果足夠小,則直接放在DirectTaskResult物件內。
如果超過特定尺寸(預設約10MB),則在Executor端會將DirectTaskResult序列化,將序列化的結果作為一個數據塊存放在BlockManager中,然後將BlockManager返回的BlockId放在IndirectTaskResult物件中返回給TaskScheduler,TaskScheduler進而呼叫TaskResultGetter將IndirectTaskResult中的BlockId取出並通過BlockManager最終取得對應的DirectTaskResult。