1. 程式人生 > >Spark任務執行流程

Spark任務執行流程

在這裡插入圖片描述 這是Spark官方給的圖,大致意思就是:

四個步驟 1.構建DAG(有向無環圖)(呼叫RDD上的方法) 2.DAGScheduler將DAG切分Stage(切分的依據是Shuffle),將Stage中生成的Task以TaskSet的形式給TaskScheduler 3.TaskScheduler排程Task(根據資源情況將Task排程到相應的Executor中) 4.Executor接收Task,然後將Task丟入到執行緒池中執行

DAG 有向無環圖(資料執行過程,有方法,無閉環) 在這裡插入圖片描述 雖然是程式的起點和終點都是hdfs,但是不是同一個檔案,如果圖中的out檔案存在的話就會報錯。

DAG描述多個RDD的轉換過程,任務執行時,可以按照DAG的描述,執行真正的計算(資料被操作的一個過程)

DAG是有邊界的:開始(通過SparkContext建立的RDD),結束(觸發Action,呼叫run Job就是一個完整的DAG)

一個RDD只是描述了資料計算過程中的一個環節,而DAG由一到多個RDD組成,描述了資料計算過程中的所有環節(過程)

一個Spark Application中是有多少個DAG:一到多個(取決於觸發了多少次Action)

為什麼要切分Stage? 一個複雜的業務邏輯(將多臺機器上具有相同屬性的資料聚合到一臺機器上:shuffle) 如果有shuffle,那麼就意味著前面階段產生的結果後,才能執行下一個階段,下一個階段的計算要依賴上一個階段的資料。 在同一個Stage中,會有多個運算元,可以合併在一起,我們稱其為pipeline(流水線:嚴格按照流程、順序執行)

在這裡插入圖片描述 Spark執行時,首先啟動一個客戶端(Driver),也可以時spark-shell客戶端

spark-submit --master spark://hadoop-master:7077 --executor-memory 4g --total-executor-cores 12

1,客戶端和Master建立連結並且申請資源,每個executor需要4g記憶體,總共需要12核 2,master進行資源排程(節點向master註冊的時候,會將自己的資源情況一併提交給master) 3,master和worker進行RPC通訊,啟動executor 4,啟動各個worker節點上的executor 5,executor和Driver端進行通訊 6,RDD觸發Action後,會根據最後這個RDD往前推斷依賴關係(寬依賴或者窄依賴),遇到Shuffle就切分Stage,會遞迴切分,遞迴的出口是RDD沒有父RDD 7,DAGScheduler切分完Stage後,會進行提交Stage,先提交前面的Stage,前面的Stage執行完之後再提交後面的Stage,每一個stage都會產生很多業務邏輯相同的Task,然後以TaskSet的形式將task傳遞給TaskScheduler,然後TaskScheduler將Task進行序列化,根據資源情況,將task傳送給Executor 8,將Driver端產生的task傳送給executor 9,executor在接收到task之後,先將task進行反序列化,然後將task用一個實現了runnable介面的實現類包裝起來,然後將該包裝類丟入執行緒池,包裝類實現的run方法就會被執行,進而呼叫task的計算邏輯。

以上就是Spark的執行流程。