Spark2.3.2原始碼解析: 10. 排程系統 Task任務提交 (一) DAGScheduler 之 stage 提交
一個Spark Application分為stage級別和task級別的排程,
task來源於stage,所有本文先從stage提交開始講解task任務提交。
架構圖:
Standalone模式提交執行流程圖:
首先寫一個WordCount程式碼(這個程式碼,為了觀察多個suffle操作,我寫了兩個reducebykey 函式)
原始碼:
直接執行程式碼,檢視spark執行程式時,將程式碼劃分stage生成的DAG流程圖
可知: WordCount 在stage劃分的時候,劃分為三個stage
即在程式碼中如下標識:
講TaskScheduler ,先從DAGScheduler中提交任務開始吧,其中在stage劃分task的時候,涉及到一些優化演算法。
org.apache.spark.scheduler.DAGScheduler#handleMapStageSubmitted
這個方法主要有三個部分:
1、建立finalStage
finalStage = getOrCreateShuffleMapStage(dependency, jobId)
2、建立ActiveJob val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
3.提交stage
submitStage(finalStage)
直接看第三步 submitStage
這個是提交stage方法。
裡面是一個遞迴方法,舉例:
在程式碼中, 劃分為三個stage:
stage0 ---> stage1 ---> stage2
submitStage(stage: Stage) 這個方法先傳入的是 finalStage(stage2)
在方法裡面迴圈遞迴, 分別尋找stage的父stage, 即 stage2 找到 stage1 , stage1找到stage0
stage0 沒有父stage 即走 提交方法:
submitMissingTasks(stage: Stage, jobId: Int)
好,接下來,我們看submitMissingTasks
可以看到入參: ShuffleMapStage 0 和 jobId 0
找出當前stage的所有分割槽中,還沒計算完分割槽的stage
ShuffleMapStage
stage.findMissingPartitions獲取需要計算的分割槽,不同的stage有不同的實現:
ResultStage
計算 分割槽的最佳位置 : taskIdToLocations
計算最佳位置的核心方法: getPreferredLocsInternal (遞迴方法)
這個開始傳入的RDD:3,
rdd:3找不到最佳位置, 找到rdd:3的父級rdd:2,
rdd2,找不到最佳位置,找到rdd2的父級rdd1
rdd1有最佳位置,直接返回: 具體的機器地址:
廣播資訊:
為每一個MapStage的分割槽 建立一個 ShuffleMapTask 或者 ResultTask
將ShuffleMapTask 或者 ResultTask 封裝成taskSet,提交Task
在這裡執行的是
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
接著呼叫執行的是:
org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks
未完,請看下一篇文章:
Spark2.3.2原始碼解析: 10. 排程系統 Task任務提交 (二) TaskScheduler
https://blog.csdn.net/zhanglong_4444/article/details/85249376