Spark 2.x 提交Job原始碼淺析
大家都知道,spark job的提交是觸發了Action操作,現在我在RDD.scala中找到collect運算元,在這下面是有一個runjob方法
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
然後繼續進入runjob方法,發現是對封裝(rdd,func,partitions)等引數的runjob的一連串呼叫,最後發現是dagScheduler呼叫了runjob方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
繼續點進runjob,接下來進入到SparkContext最核心的DAGScheduler,這裡可以看到呼叫了一個submitJob方法,他是DAGScheduler的submitJob,它會提交一個Job任務,然後返回一個阻塞的執行緒等待Job完成
def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { val start = System.nanoTime //在這裡會提交一個Job任務,然後會返回一個阻塞的執行緒等待Job執行完成 val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) //下面是根據不同的Job任務執行情況列印不同的Log資訊 waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) throw exception } }
然後進入到submitJob方法, eventProcessLoop是DAGSchedulerEventProcessLoop的一個例項
def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // 檢查分割槽是否存在保證Task正常執行 val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } //增加一個JobId作當前Job的標識 val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { // 如果沒有Task任務,將立即返回JobWaiter return new JobWaiter[U](this, jobId, 0, resultHandler) } //為分割槽做個判斷,確保分割槽大於0 assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] //首先構造一個JobWaiter阻塞執行緒,等待Job完成,然後把結果提交給resultHandler val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) //eventProcessLoop是DAGScheduler的事件佇列 //因為可能叢集執行著多個Job,而DAGScheduler預設是FIFO先進先出的資源排程 //這裡傳入的事件型別是JobSubmitted,而在eventProcessLoop會呼叫doOnReceive //來匹配事件型別並執行對應的操作,最終會匹配到dagScheduler、handleJobSubmitted eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter }
進入到eventProcessLoop 裡面new DAGSchedulerEventProcessLoop(this)專門用來接收Job和Stage的發來的訊息
private val messageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
//專門用來接收Job和Stage的發來的訊息
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)
DAGSchedulerEventProcessLoop是個類,這個類裡面呼叫了很關鍵的onRceive方法和doOnReceive方法 ,它會對裡面的事件不斷地迴圈呼叫,要處理Stage的劃分、shuffleMapTask和ResultTask的劃分,還有一些job任務的提交和型別
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
//在這裡注意一下,它會進行很關鍵的操作
doOnReceive(event)
} finally {
timerContext.stop()
}
}
//然後通過模式匹配進行匹配哪個事件模型
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
//注意這裡是Stage劃分的精髓所在
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)
case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId, reason) =>
val filesLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, filesLost)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
進入handleJobSubmitted方法,先建立resultStage,它是真正開始處理Job劃分Stage的事件,劃分ShufflemapStage後,ShufflemapStage裡面會劃分ShufflemapTask,每個job由一個ResultStage和0+個ShufflemapShuffle組成,finalRDD和分割槽、jobId都會被傳進finalStage
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
//建立ResultStage 這裡才是真正開始處理提交的job劃分stage的時候
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
//每個job都是由1個ResultStage和0+個ShuffleMapStage組成
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}
進入createResultStage方法,建立ResultStage的父Stage,將父stage的引數放入到new ResultStage()中,生成ResultStage
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
//開始建立ResultStage的父stage
//裡面有多個巢狀獲取shuffle依賴和迴圈建立shuffleMapStage,若沒有shuffle
//操作則返回空list
val parents = getOrCreateParentStages(rdd, jobId)
//當前的stageId標識+1
val id = nextStageId.getAndIncrement()
//放入剛剛生成的父stage等核心引數,生成ResultStage
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
//把ResultStage和它的ID加入stageIdToList
stageIdToStage(id) = stage
//更新jobIds和jobIdToStageIds
updateJobIdStageIdMaps(jobId, stage)
//返回這個ResultStage
stage
}
進入到建立父Stage的方法getOrCreateParentStages
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
//從getshuffleDependencies開始
//這裡僅僅是抽取當前RDD的shuffle依賴
//(job的stage是以shuffle劃分的,1個job中只會生成1個resultStage和0+個
// shuffleMapStage,如果不是shuffleDependency就繼續抽取父RDD。。)
//迭代遍歷一直到抽取出為止或者沒有
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
進入getOrCreateShuffleMapStage方法中,進行匹配能不能取到ParentStage的值,當沒有parentStage的時候會返回空,能取到就返回stage,ShuffleMapStage是根據遍歷出的ShuffleDependencies一次次創建出來的
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
//通過從ShuffleDependency提取到的shuffleId來提取shuffleIdToMapStage中的
//shuffleMapStage
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
//如果能取到就直接返回
case Some(stage) =>
stage
//如果取不到就會依次找到所有父ShuffleDependencies並且構建
//所有父ShuffleMapStage
case None =>
// Create stages for all missing ancestor shuffle dependencies.
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
// Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
// that were not already in shuffleIdToMapStage, it's possible that by the time we
// get to a particular dependency in the foreach loop, it's been added to
// shuffleIdToMapStage by the stage creation process for an earlier dependency. See
// SPARK-13902 for more information.
//根據遍歷出來的所有shuffleDependency依次建立所有父ShuffleMapStage
//接下來進行判斷是否是父stage
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
//最後會建立當前ShuffleDependency的ShuffleMapStage
// Finally, create a stage for the given shuffle dependency.
createShuffleMapStage(shuffleDep, firstJobId)
}
}
進入createShuffleMapStage方法 此方法是遞迴迴圈建立shuffleMapStage的過程
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
//ShuffleDependency的父RDD
val rdd = shuffleDep.rdd
//多少分割槽
val numTasks = rdd.partitions.length
//用父RDD迴圈呼叫,每次呼叫都是前一個父RDD
//在這裡其實就會一直遞迴迴圈直到拿到首個stage才退出來
//最後把生成的ShuffleMapStage加入shuffleIdToMapStage以便後面直接從中拿取
val parents = getOrCreateParentStages(rdd, jobId)
//標記當前StageId nextStageId+1
val id = nextStageId.getAndIncrement()
//拿到之前的stage等核心引數後就可以構建ShuffleMapStage了
val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
//把剛建立的ShuffleMapStage賦值給stageIdToStage
stageIdToStage(id) = stage
//賦值給shuffleIdToMapStage
//若後面的程式碼再次生成對應的ShuffleMapStage就可以從shuffleIdToMapStage
//中直接拿取了
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
//更新jobIds的jobIdToStageIds
updateJobIdStageIdMaps(jobId, stage)
//這裡會把shuffle資訊註冊到Driver上的MapOutputTrackerMaster的
//shufflestatuses
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// A previously run stage generated partitions for this shuffle, so for each output
// that's still available, copy information about that output location to the new stage
// (so we don't unnecessarily re-compute that data).
//把shuffle資訊註冊到自己Driver的MapOutputTrackerMaster
//生成的是shuffleId和shuffleStatus的對映關係
//在後面提交Job的時候還會根據它來的map stage是否已經準備好
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
(0 until locs.length).foreach { i =>
if (locs(i) ne null) {
// locs(i) will be null if missing
stage.addOutputLoc(i, locs(i))
}
}
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
//這裡建立好ShuffleMapStage後
//可以看到把Shuffle資訊註冊到自己Driver的MapOutputTrackerMaster
//的shuffleStatuses中,用來在後面的驗證和reduce端拉取map輸出
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
//最後返回生成的ShuffleMapStage
stage
這個時候ShuffleMapStage已經建立完成了,並不是一次就建立完成,而是遇見shuffle的時候會由下往上遞迴建立ShuffleMapStage
今天先分享到這裡,有理解錯誤的地方歡迎批評指正 !