【Spark】DAGScheduler源代碼淺析
DAGScheduler
DAGScheduler的主要任務是基於Stage構建DAG,決定每個任務的最佳位置
- 記錄哪個RDD或者Stage輸出被物化
- 面向stage的調度層。為job生成以stage組成的DAG。提交TaskSet給TaskScheduler運行
- 又一次提交shuffle輸出丟失的stage
每個Stage內。都是獨立的tasks,他們共同運行同一個computefunction,享有同樣的shuffledependencies。DAG在切分stage的時候是按照出現shuffle為界限的。
DAGScheduler實例化
以下的代碼是SparkContext實例化DAGScheduler的過程:
@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
} catch {
case e: Exception => {
try {
stop()
} finally {
throw new SparkException("Error while constructing DAGScheduler" , e)
}
}
}
以下代碼顯示了DAGScheduler的構造函數定義中,通過綁定TaskScheduler的方式創建,當中次構造函數去調用主構造函數來將sc的字段填充入參:
private[spark]
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock() )
extends Logging {
def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
this(
sc,
taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env)
}
def this(sc: SparkContext) = this(sc, sc.taskScheduler)
作業提交與DAGScheduler操作
Action的大部分操作會進行作業(job)的提交,源代碼1.0版的job提交過程的大致調用鏈是:sc.runJob()
–>dagScheduler.runJob
–>dagScheduler.submitJob
—>dagSchedulerEventProcessActor.JobSubmitted
–>dagScheduler.handleJobSubmitted
–>dagScheduler.submitStage
–>dagScheduler.submitMissingTasks
–>taskScheduler.submitTasks
。
詳細的作業提交運行期的函數調用為:
- sc.runJob->dagScheduler.runJob->submitJob
- DAGScheduler::submitJob會創建JobSummitted的event發送給內嵌類eventProcessActor(在源代碼1.4中,submitJob函數中,使用DAGSchedulerEventProcessLoop類進行事件的處理)
- eventProcessActor在接收到JobSubmmitted之後調用processEvent處理函數
- job到stage的轉換,生成finalStage並提交運行。關鍵是調用submitStage
- 在submitStage中會計算stage之間的依賴關系,依賴關系分為寬依賴和窄依賴兩種
- 假設計算中發現當前的stage沒有不論什麽依賴或者全部的依賴都已經準備完畢,則提交task
- 提交task是調用函數submitMissingTasks來完畢
- task真正運行在哪個worker上面是由TaskScheduler來管理,也就是上面的submitMissingTasks會調用TaskScheduler::submitTasks
- TaskSchedulerImpl中會依據Spark的當前運行模式來創建對應的backend,假設是在單機運行則創建LocalBackend
- LocalBackend收到TaskSchedulerImpl傳遞進來的ReceiveOffers事件
- receiveOffers->executor.launchTask->TaskRunner.run
DAGScheduler的runJob函數
DAGScheduler.runjob最後把結果通過resultHandler保存返回。
這裏DAGScheduler的runJob函數調用DAGScheduler的submitJob函數來提交任務:
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
waiter.awaitResult() match {
case JobSucceeded => {
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
}
case JobFailed(exception: Exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
throw exception
}
}
作業提交的調度
在Spark源代碼1.4.0中,DAGScheduler的submitJob函數不再使用DAGEventProcessActor進行事件處理和消息通信,而是使用DAGSchedulerEventProcessLoop類實例eventProcessLoop進行JobSubmitted事件的post動作。
以下是submitJob函數代碼:
/**
* Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
* can be used to block until the the job finishes executing or can be used to cancel the job.
*/
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
waiter
}
當eventProcessLoop對象投遞了JobSubmitted事件之後,對象內的eventThread線程實例對事件進行處理。不斷從事件隊列中取出事件,調用onReceive函數處理事件。當匹配到JobSubmitted事件後。調用DAGScheduler的handleJobSubmitted函數並傳入jobid、rdd等參數來處理Job。
handleJobSubmitted函數
Job處理過程中handleJobSubmitted比較關鍵,該函數主要負責RDD的依賴性分析。生成finalStage,並依據finalStage來產生ActiveJob。
在handleJobSubmitted函數源代碼中。給出了部分凝視:
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: Stage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
} catch {
//錯誤處理,告訴監聽器作業失敗。返回....
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
if (finalStage != null) {
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
job.jobId, callSite.shortForm, partitions.length, allowLocal))
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val shouldRunLocally =
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
val jobSubmissionTime = clock.getTimeMillis()
if (shouldRunLocally) {
// 非常短、沒有父stage的本地操作,比方 first() or take() 的操作本地運行
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
runLocally(job)
} else {
// collect等操作走的是這個過程。更新相關的關系映射,用監聽器監聽,然後提交作業
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.resultOfJob = Some(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
// 提交stage
submitStage(finalStage)
}
}
// 提交stage
submitWaitingStages()
}
小結
該篇文章介紹了DAGScheduler從SparkContext中進行實例化,到運行Action操作時提交任務調用runJob函數,進而介紹了提交任務的消息調度,和處理Job函數handleJobSubmitted函數。
因為在handleJobSubmitted函數中涉及到依賴性分析和stage的源代碼內容,於是我計劃在下一篇文章裏進行介紹和源代碼分析。
轉載請註明作者Jason Ding及其出處
GitCafe博客主頁(http://jasonding1354.gitcafe.io/)
Github博客主頁(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡書主頁(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進入我的博客主頁
【Spark】DAGScheduler源代碼淺析