Spark2.4.0屏障排程器
前幾天,浪尖發了一篇文章,講了Spark 2.4釋出更新情況:
其中,就有一項說到Spark 為了支援深度學習而引入的屏障排程器。本文就詳細講講。
基於訊息傳遞結構的計算模型和Spark計算模型是有很大區別。在Spark 內部,每個Stage的某一個一個task不會依賴於相同Stage任何其他的task,因此,Spark的task 可以被獨立進行排程執行。為了在Spark中嵌入MPI功能,需要引入一個新的排程模型,暫時命名為“屏障排程”(浪尖直譯自barrier scheduling),該排程模型會同時啟動任務,併為使用者提供足夠的資訊和工具,將分散式DL訓練嵌入到
1. 要求
概述
每個job中單個barrier stage。
每個job中多個barrier stage。
多job且每個job都帶有barrier stage。
Barrier stage 請求的slot比可用的slot多(無動態資源申請)。
Barrier stage請求的slot比可用的slot多(有動態資源申請)。(Spark 2.4就不要想了)
目標
支援
正確的處理失敗的場景。
Barrier執行模式支援執行與Standalone模式
使用yarn/mesos/k8s的使用者可以再有BarrierStage的時候設定MPI。
安全
使用者使用外部執行緒啟動MPI任務的時候,存在外部進行不被殺死而導致記憶體洩漏的風險。Barrier tasks會使用遠端客戶端相互交流,但是不會影響Spark當前的安全模型。
API變化
class RDD[T] {
/** Indicates that Spark must launch the tasks together for the current stage. */
def barrier(): RDDBarrier[T] = ???
}
/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
class BarrierTaskContext extends TaskContext {
/** Sets a global barrier and waits until all tasks in this stage hit this
barrier. */
def barrier(): Unit = ???
/** Returns the all task infos in this stage. */
def getTaskInfos(): Array[BarrierTaskInfo]
}
/** Represents an RDD barrier, which forces Spark to launch tasks of this stage
together. */
class RDDBarrier[T] {
/** Maps partitions together with a provided [[BarrierTaskContext]]. */
def mapPartitions[S](f: Iterator[T] => Iterator[S]): RDD[S] = ???
/** TODO extra conf(e.g. timeout) */
}
使用案例
rdd.barrier().mapPartitions { iter =>
// Write iter to disk.
???
// Fetch TaskContext
val context = BarrierTaskContext.get()
// Wait until all tasks finished writing.
context.barrier()
// The 0-th task launches an MPI job.
if (context.partitionId() == 0) {
val hosts = context.getTaskInfos().map(_.address)
// Set up MPI machine file using host infos.
???
// Launch the MPI job by calling mpirun.
??? }
// Wait until the MPI job finished.
context.barrier()
// Collect output and return.
??? }
3. 架構
設計提議
為了使spark支援屏障排程(barrier scheduling),在Spark內部增加了RDDBarrier和BarrierTaskContext。
BarrierStage
如果沒有充足的slot資源,barrier stage不會被拉起(也即是空閒的core 必須能夠拉起該barrier所有tasks),這樣設計使為了滿足一次拉起所有task的目標。
同時當任意的task執行失敗的時候,總是重啟整個barrier stage。
判斷一個stage是否是Barrier stage的一種方式是跟蹤Stage所包含的RDD,如果該stage包含RDDBarrier 或者至少一個父RDD是RDDBarrier,那麼該stage是一個barrier stage,當然要以shuffleDependency為界限。
排程Barrier Tasks
目前,TaskScheduler會盡可能的在可用的slot上排程task,所以通常不會同時啟動同一個stage的所有task。因此需要在barrier stage 的task在排程之前加上資源可用性判斷。由於任務的區域性性問題,仍然可能僅啟動整個barrier stage的部分tasks,因此必須在啟動任務之前在此檢查確認同一個barrier stage的所有task同時被啟動。
Barrier tasks預計比常規tasks具有更長的生命週期,因此barriertasks可能會在相對長的時間範圍內佔用叢集資源,後續提交的任務估計會延遲執行或者僅使用更少的slot執行。建議使用Fair排程策略進行排程,而不是預設的FIFO排程策略,並將barrier任務獨立執行,這樣至少可以保證普通任務可以在配置給定最少的叢集資源上執行。
另一個問題是barrier stage可以提交,但是叢集當前沒有足夠的slot來同時啟動所有barrier tasks。如果啟用了動態資源分配,則在等待一段時間後,可能會或可能不會滿足要求(取決於允許的最大節點)。對於Spark 2.4,提出了一個簡單的解決方案,它只檢查當前執行的slot的總數,如果數量不足以同時啟動同一個stage的所有屏障任務,則取消該job。目標是在3.0的時候可以更好地與動態資源分配整合。對於Spark 2.4,在啟用動態資源分配時,job會立即失敗,或者job無法連續提交,因為它試圖提交一個barrier stage,該stage需要比叢集中當前可用的slot更多的slot。
Task Barrier
Barrier tasks應允許使用者在task執行過程中插入同步操作,這可以通過在BarrierTaskContext中引入全域性barrier操作來實現,這使得當前任務等待直到同一stage中的所有task都達到此barrier。將為BarrierTaskContext.barrier()提交單獨的設計文件。
關注公眾號,bigdatatip,回覆barrier 即可獲得該文件。
失敗容錯
為確保正確性,當任何task失敗時,barrier stage始終會重試整個stage。因此,將要求殺死失敗stage的所有正在執行的任務,並且還保證每個單個stage最多隻能執行一個taskset (沒有zombie task),這是非常簡單的。理想情況下,除了在zombie taskset中殺死正在執行的任務需要一段時間,每個單一stage只應執行一個taskset,必須將失敗的taskset標記為zombie 並正確處理TaskKilled訊息。
推測任務(Speculativetask)
在barrier 執行模式中,要求每個barrier task必須僅有一個唯一的task ID,目的是其他的tasks 可以直接使用該ID和它互動。這也就意味著每個task只能嘗試啟動一次,因此必須禁止推測執行。
此外,3.0的時候可能會將Spark任務推測執行設定為單個stage的配置而不是全域性配置。
SparkContext.runJob()/PartitionPruningRDD
SparkContext.runJob()執行的時候可以僅是所有分割槽的子集,其中一個用例是RDD.first(),不會執行所有分割槽。這種是與barrer執行模式衝突的,可能無法啟動某些barrier tasks。在barrier stage檢測到這種用法,會由於不支援該操作而丟擲異常。
ParititionPruningRDD的情況類似,它只在滿足`partitionFilterFunc`的分割槽上啟動任務。我們將在barrierstage 檢測PartitionPruningRDD並丟擲顯式異常。
以上問題都與父RDD與生成的RDD具有不同分割槽數的問題有關(例如union()/ coalesce()/ first()/ take()/ PartitionPruningRDD),因此可以檢測RDD的血統鏈條,然後在job 提交的時候立即停止。
如果RDD依賴於多個barrier RDD(例如,barrierRdd1.zip(barrierRdd2)),也將立即停止,如果發生這種情況,則無法確保`barrier()`呼叫的正確行為。
針對Spark 3.0,可以進一步調查上述用例並提出支援它們的方法。
本文牽涉到的英文原文,關注公眾號 bigdatatip,輸入 barrier 獲取。
推薦閱讀:
歡迎點贊,轉發!