Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend
Spark Runtime裏的主要層次分析,梳理Runtime組件和運行流程,
DAGScheduler
Job=多個stage,Stage=多個同種task, Task分為ShuffleMapTask和ResultTask,Dependency分為ShuffleDependency和NarrowDependency
面向stage的切分,切分依據為寬依賴
維護waiting jobs和active jobs。維護waiting stages、active stages和failed stages,以及與jobs的映射關系
主要職能
- 接收提交Job的主入口。
submitJob(rdd, ...)
runJob(rdd, ...)
。在SparkContext
裏會調用這兩個方法。
- 生成一個Stage並提交,接著推斷Stage是否有父Stage未完畢,若有。提交並等待父Stage。以此類推。結果是:DAGScheduler裏添加了一些waiting stage和一個running stage。
- running stage提交後。分析stage裏Task的類型,生成一個Task描寫敘述,即TaskSet。
- 調用
TaskScheduler.submitTask(taskSet, ...)
方法,把Task描寫敘述提交給TaskScheduler。TaskScheduler依據資源量和觸發分配條件,會為這個TaskSet分配資源並觸發運行。 DAGScheduler
提交job後。異步返回JobWaiter
對象。能夠返回job運行狀態,能夠cancel job,運行成功後會處理並返回結果
- 處理
TaskCompletionEvent
- 假設task運行成功,相應的stage裏減去這個task。做一些計數工作:
- 假設task是ResultTask,計數器
Accumulator
加一。在job裏為該task置true,job finish總數加一。加完後假設finish數目與partition數目相等。說明這個stage完畢了,標記stage完畢。從running stages裏減去這個stage,做一些stage移除的清理工作
- 假設task是ShuffleMapTask。計數器
Accumulator
MapStatus
類。MapStatus
是ShuffleMapTask
運行完畢的返回,包含location信息和block size(能夠選擇壓縮或未壓縮)。同一時候檢查該stage完畢,向MapOutputTracker
註冊本stage裏的shuffleId和location信息。然後檢查stage的output location裏是否存在空。若存在空。說明一些task失敗了,整個stage又一次提交;否則,繼續從waiting stages裏提交下一個須要做的stage
- 假設task是ResultTask,計數器
- 假設task是重提交,相應的stage裏添加這個task
- 假設task是fetch失敗,立即標記相應的stage完畢。從running stages裏減去。
假設不同意retry。abort整個stage;否則,又一次提交整個stage。
另外,把這個fetch相關的location和map任務信息。從stage裏剔除,從
MapOutputTracker
註銷掉。最後,假設這次fetch的blockManagerId對象不為空,做一次ExecutorLost
處理,下次shuffle會換在還有一個executor上去運行。 - 其它task狀態會由
TaskScheduler
處理,如Exception, TaskResultLost, commitDenied等。
- 假設task運行成功,相應的stage裏減去這個task。做一些計數工作:
- 其它與job相關的操作還包含:cancel job, cancel stage, resubmit failed stage等
其它職能
1. cacheLocations 和 preferLocation
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
TaskScheduler
維護task和executor相應關系,executor和物理資源相應關系。在排隊的task和正在跑的task。
內部維護一個任務隊列。依據FIFO或Fair策略,調度任務。
TaskScheduler
本身是個接口,spark裏僅僅實現了一個TaskSchedulerImpl
。理論上任務調度能夠定制。以下是TaskScheduler
的主要接口:
def start(): Unit
def postStartHook() { }
def stop(): Unit
def submitTasks(taskSet: TaskSet): Unit
def cancelTasks(stageId: Int, interruptThread: Boolean)
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean
主要職能
submitTasks(taskSet)
,接收DAGScheduler
提交來的tasks
- 為tasks創建一個
TaskSetManager
,加入到任務隊列裏。TaskSetManager
跟蹤每一個task的運行狀況,維護了task的很多詳細信息。 - 觸發一次資源的索要。
- 首先。
TaskScheduler
對比手頭的可用資源和Task隊列。進行executor分配(考慮優先級、本地化等策略),符合條件的executor會被分配給TaskSetManager
。 - 然後。得到的Task描寫敘述交給
SchedulerBackend
。調用launchTask(tasks)
。觸發executor上task的運行。task描寫敘述被序列化後發給executor,executor提取task信息。調用task的
run()
方法運行計算。
- 首先。
- 為tasks創建一個
cancelTasks(stageId)
,取消一個stage的tasks
- 調用
SchedulerBackend
的killTask(taskId, executorId, ...)
方法。taskId和executorId在
TaskScheduler
裏一直維護著。
- 調用
resourceOffer(offers: Seq[Workers])
,這是很重要的一個方法,調用者是SchedulerBacnend
,用途是底層資源SchedulerBackend
把空余的workers資源交給TaskScheduler
。讓其依據調度策略為排隊的任務分配合理的cpu和內存資源。然後把任務描寫敘述列表傳回給SchedulerBackend
- 從worker offers裏。搜集executor和host的相應關系、active executors、機架信息等等
- worker offers資源列表進行隨機洗牌,任務隊列裏的任務列表依據調度策略進行一次排序
- 遍歷每一個taskSet,依照進程本地化、worker本地化、機器本地化、機架本地化的優先級順序,為每一個taskSet提供可用的cpu核數,看是否滿足
- 默認一個task須要一個cpu。設置參數為
"spark.task.cpus=1"
- 為taskSet分配資源,校驗是否滿足的邏輯,終於在
TaskSetManager
的resourceOffer(execId, host, maxLocality)
方法裏 - 滿足的話,會生成終於的任務描寫敘述。而且調用
DAGScheduler
的taskStarted(task, info)
方法。通知DAGScheduler
,這時候每次會觸發DAGScheduler
做一次submitMissingStage
的嘗試,即stage的tasks都分配到了資源的話,立即會被提交運行
- 默認一個task須要一個cpu。設置參數為
statusUpdate(taskId, taskState, data)
,還有一個很重要的方法,調用者是SchedulerBacnend
,用途是SchedulerBacnend
會將task運行的狀態匯報給TaskScheduler
做一些決定
- 若
TaskLost
,找到該task相應的executor。從active executor裏移除。避免這個executor被分配到其它task繼續失敗下去。 - task finish包含四種狀態:finished, killed, failed, lost。僅僅有finished是成功運行完畢了。
其它三種是失敗。
- task成功運行完,調用
TaskResultGetter.enqueueSuccessfulTask(taskSet, tid, data)
,否則調用TaskResultGetter.enqueueFailedTask(taskSet, tid, state, data)
。TaskResultGetter
內部維護了一個線程池,負責異步fetch task運行結果並反序列化。默認開四個線程做這件事,可配參數"spark.resultGetter.threads"=4
。
- 若
TaskResultGetter取task result的邏輯
- 對於success task。假設taskResult裏的數據是直接結果數據。直接把data反序列出來得到結果。假設不是。會調用
blockManager.getRemoteBytes(blockId)
從遠程獲取。假設遠程取回的數據是空的,那麽會調用
TaskScheduler.handleFailedTask
,告訴它這個任務是完畢了的可是數據是丟失的。否則。取到數據之後會通知
BlockManagerMaster
移除這個block信息,調用TaskScheduler.handleSuccessfulTask
。告訴它這個任務是運行成功的。而且把result data傳回去。 - 對於failed task。從data裏解析出fail的理由,調用
TaskScheduler.handleFailedTask
。告訴它這個任務失敗了,理由是什麽。
SchedulerBackend
在TaskScheduler
下層。用於對接不同的資源管理系統,SchedulerBackend
是個接口。須要實現的主要方法例如以下:
def start(): Unit
def stop(): Unit
def reviveOffers(): Unit // 重要方法:SchedulerBackend把自己手頭上的可用資源交給TaskScheduler。TaskScheduler依據調度策略分配給排隊的任務嗎,返回一批可運行的任務描寫敘述,SchedulerBackend負責launchTask,即終於把task塞到了executor模型上,executor裏的線程池會運行task的run()
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException
粗粒度:進程常駐的模式。典型代表是standalone模式,mesos粗粒度模式,yarn
細粒度:mesos細粒度模式
這裏討論粗粒度模式,更好理解:CoarseGrainedSchedulerBackend
。
維護executor相關信息(包含executor的地址、通信端口、host、總核數。剩余核數),手頭上executor有多少被註冊使用了。有多少剩余,總共還有多少核是空的等等。
主要職能
- Driver端主要通過actor監聽和處理以下這些事件:
RegisterExecutor(executorId, hostPort, cores, logUrls)
。這是executor加入的來源,通常worker拉起、重新啟動會觸發executor的註冊。CoarseGrainedSchedulerBackend
把這些executor維護起來,更新內部的資源信息。比方總核數添加。最後調用一次makeOffer()
,即把手頭資源丟給TaskScheduler
去分配一次。返回任務描寫敘述回來。把任務launch起來。這個
makeOffer()
的調用會出如今不論什麽與資源變化相關的事件中,以下會看到。StatusUpdate(executorId, taskId, state, data)
。task的狀態回調。首先,調用TaskScheduler.statusUpdate
上報上去。然後。推斷這個task是否運行結束了。結束了的話把executor上的freeCore加回去,調用一次makeOffer()
。ReviveOffers
。這個事件就是別人直接向
SchedulerBackend
請求資源,直接調用makeOffer()
。KillTask(taskId, executorId, interruptThread)
。這個killTask的事件。會被發送給executor的actor,executor會處理KillTask
這個事件。StopExecutors
。通知每一個executor,處理StopExecutor
事件。RemoveExecutor(executorId, reason)
。從維護信息中,那這堆executor涉及的資源數減掉。然後調用TaskScheduler.executorLost()
方法,通知上層我這邊有一批資源不能用了,你處理下吧。TaskScheduler
會繼續把executorLost
的事件上報給DAGScheduler
,原因是DAGScheduler
關心shuffle任務的output location。DAGScheduler
會告訴BlockManager
這個executor不可用了,移走它,然後把全部的stage的shuffleOutput信息都遍歷一遍,移走這個executor,而且把更新後的shuffleOutput信息註冊到MapOutputTracker
上,最後清理下本地的CachedLocations
Map。
reviveOffers()
方法的實現。直接調用了
makeOffers()
方法,得到一批可運行的任務描寫敘述。調用launchTasks
。launchTasks(tasks: Seq[Seq[TaskDescription]])
方法。
- 遍歷每一個task描寫敘述。序列化成二進制。然後發送給每一個相應的executor這個任務信息
- 假設這個二進制信息太大,超過了9.2M(默認的akkaFrameSize 10M 減去 默認 為akka留空的200K)。會出錯,abort整個taskSet。並打印提醒增大akka frame size
- 假設二進制數據大小可接受,發送給executor的actor。處理
LaunchTask(serializedTask)
事件。
- 遍歷每一個task描寫敘述。序列化成二進制。然後發送給每一個相應的executor這個任務信息
Executor
Executor是spark裏的進程模型。能夠套用到不同的資源管理系統上。與SchedulerBackend
配合使用。
內部有個線程池,有個running tasks map,有個actor,接收上面提到的由SchedulerBackend
發來的事件。
事件處理
launchTask
。依據task描寫敘述。生成一個
TaskRunner
線程,丟盡running tasks map裏。用線程池運行這個TaskRunner
killTask
。從running tasks map裏拿出線程對象,調它的kill方法。
全文完 :)
Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend