basic spark or spark essentials-02(notes)
阿新 • • 發佈:2017-10-07
shuff cal 嚴格 存在 event notes clas one 否則 submitjob::做了什麽
1含有dagScheduler的runJob函數的runJob是入口,並且是堵塞的操作,即直到Spark完成Job的運行之前,rdd.doCheckpoint()是不會執行的。堵塞在3的waiter.awaitResult()操作,即submitJob會返回一個waiter對象,而awaitResult()就堵塞了。其中resultHandler參數是下面runjob構造的回調函數,這裏是沒有返回值的。
2runjob,構造Array,並將函數對象"(index, res) => results(index) = res"繼續傳遞給runJob函數(即在runJob添加一個回調函數,將runJob的運行結果保存到Array),等待runJob函數運行結束,將results返回。
3最終導致了DAGScheduler中的submitJob中,給這次job分配了一個jobID, 通過創建了一個JobWaiter對象,返回給1中。調用eventProcessLoop .post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties)))向DAG調度器 發送一個case class JobSubmitted的消息。
4消息包包含:jobId,rdd,func2,partitions.toArray,callSite/properties:個人不是很感興趣,姑且理解為不重要的
waiter就是上面創建的JobWaiter對象,這個很重要,因為這個對象封裝了幾個重要的參數:jobId:Job編號,partitions.size:分區編號(它需要接受到partitions.size個歸屬於jobid的task成功運行的結果,並通過resultHandler來將這些task運行結果回調給的Array),resultHandler:回調函數
5消息循環器 會不斷的檢查有沒有消息要處理,並調用handlerjobsubmitted來處理該消息,開始劃分stages。HandleJobSubmitted 生成finalStage後,就會為該Job生成一個ActiveJob,保存當前Job的一些信息,同時調用submitStage來提交Stage。
(這裏Stage的劃分是對一個Job裏面一系列RDD轉換和動作進行劃分。首先job是因動作而產生,因此每個job肯定都有一個ResultStage,否則job就不會啟動。其次,如果Job內部RDD之間存在寬依賴,Spark會針對它產生一個中間Stage,即為ShuffleStage,嚴格來說應該是ShuffleMapStage,這個stage是針對父RDD而產生的, 相當於在父RDD上做一個父rdd.map().collect()的操作。ShuffleMapStage生成的map輸入,對於子RDD,如果檢測到所自己所“寬依賴”的stage完成計算,就可以啟動一個shuffleFectch, 從而將父RDD輸出的數據拉取過程,進行後續的計算。因此一個Job由一個ResultStage和多個ShuffleMapStage組成。
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite))
(這裏的newResultStage是如何劃分task(一個partition就是一個task)?)
6首先是在當前的rdd上調用getParentStagesAndId來生成父Stage,父Stages是一個列表
然後就創建一個Stage對象,並更新Stage和job之間的關系.
stage類:
ShuffleMapStage:
shuffleDep:該stage生成的原因
parents:父stage列表
ShuffleMapStage.class
getParentStages 的具體的實現使用的 是圖算法裏面的 廣度遍歷。
RDD[_]是表示任何類型的RDD
basic spark or spark essentials-02(notes)