Spark排程系列-----5.Spark task和Stage的跳過執行(ui顯示task和stage skipped)
在spark的首頁ui上經常顯示任務和Stage被skipped,如以下截圖所式:
本文將闡述什麼情況下Stage或者Task會顯示為skipped,以及stage和task顯示為skipped的時候是否spark application執行會出問題?
Spark Job的ResultStage的最後一個Task成功執行之後,DAGScheduler.handleTaskCompletion方法會發送SparkListenerJobEnd事件,原始碼如下:
private[scheduler] def handleTaskCompletion(event: CompletionEvent) { val task = event.task val stageId = task.stageId val taskType = Utils.getFormattedClassName(task) outputCommitCoordinator.taskCompleted(stageId, task.partitionId, event.taskInfo.attempt, event.reason) // The success case is dealt with separately below, since we need to compute accumulator // updates before posting. if (event.reason != Success) { val attemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1) listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason, event.taskInfo, event.taskMetrics)) } if (!stageIdToStage.contains(task.stageId)) { // Skip all the actions if the stage has been cancelled. return } val stage = stageIdToStage(task.stageId) event.reason match { case Success => listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, event.reason, event.taskInfo, event.taskMetrics)) stage.pendingTasks -= task task match { case rt: ResultTask[_, _] => // Cast to ResultStage here because it's part of the ResultTask // TODO Refactor this out to a function that accepts a ResultStage val resultStage = stage.asInstanceOf[ResultStage] resultStage.resultOfJob match { case Some(job) => if (!job.finished(rt.outputId)) { updateAccumulators(event) job.finished(rt.outputId) = true job.numFinished += 1 // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) {//ResultStage所有任務都執行完畢,傳送SparkListenerJobEnd事件 markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) listenerBus.post( SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) } // taskSucceeded runs some user code that might throw an exception. Make sure // we are resilient against that. try { job.listener.taskSucceeded(rt.outputId, event.result) } catch { case e: Exception => // TODO: Perhaps we want to mark the resultStage as failed? job.listener.jobFailed(new SparkDriverExecutionException(e)) } } case None => logInfo("Ignoring result from " + rt + " because its job has finished") }
JobProgressListener.onJobEnd方法負責處理SparkListenerJobEnd事件,程式碼如下:
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized { val jobData = activeJobs.remove(jobEnd.jobId).getOrElse { logWarning(s"Job completed for unknown job ${jobEnd.jobId}") new JobUIData(jobId = jobEnd.jobId) } jobData.completionTime = Option(jobEnd.time).filter(_ >= 0) jobData.stageIds.foreach(pendingStages.remove) jobEnd.jobResult match { case JobSucceeded => completedJobs += jobData trimJobsIfNecessary(completedJobs) jobData.status = JobExecutionStatus.SUCCEEDED numCompletedJobs += 1 case JobFailed(exception) => failedJobs += jobData trimJobsIfNecessary(failedJobs) jobData.status = JobExecutionStatus.FAILED numFailedJobs += 1 } for (stageId <- jobData.stageIds) { stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage => jobsUsingStage.remove(jobEnd.jobId) if (jobsUsingStage.isEmpty) { stageIdToActiveJobIds.remove(stageId) } stageIdToInfo.get(stageId).foreach { stageInfo => if (stageInfo.submissionTime.isEmpty) {//Job的Stage沒有提交執行,則這個Stage和它對應的Task會標記為skipped stage和skipped task進行統計 // if this stage is pending, it won't complete, so mark it as "skipped": skippedStages += stageInfo trimStagesIfNecessary(skippedStages) jobData.numSkippedStages += 1 jobData.numSkippedTasks += stageInfo.numTasks } } } } }
StageInfo.submissionTime在Stage被分解成TaskSet,並且TaskSet被提交到TaskSetManager之前進行設定,原始碼如下:
private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() // First figure out the indexes of partition ids to compute. //parititionsToCompute是一個List, 表示一個stage需要compute的所有分割槽的index val partitionsToCompute: Seq[Int] = { stage match { case stage: ShuffleMapStage => (0 until stage.numPartitions).filter(id => stage.outputLocs(id).isEmpty) case stage: ResultStage => val job = stage.resultOfJob.get (0 until job.numPartitions).filter(id => !job.finished(id)) } } val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are // serializable. If tasks are not serializable, a SparkListenerStageCompleted event // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) outputCommitCoordinator.stageStart(stage.id) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast // the serialized copy of the RDD and for each task we will deserialize it, which means each // task gets a different copy of the RDD. This provides stronger isolation between tasks that // might modify state of objects referenced in their closures. This is necessary in Hadoop // where the JobConf/Configuration object is not thread-safe. var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). val taskBinaryBytes: Array[Byte] = stage match { case stage: ShuffleMapStage => closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array() case stage: ResultStage => closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array() } taskBinary = sc.broadcast(taskBinaryBytes)//將任務資訊構造成廣播變數,廣播到每個Executor } catch { // In the case of a failure during serialization, abort the stage. case e: NotSerializableException => abortStage(stage, "Task not serializable: " + e.toString) runningStages -= stage // Abort execution return case NonFatal(e) => abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}") runningStages -= stage return } //tasks是一個List,它表示一個stage每個task的描述,描述資訊為:task所在stage id、task處理的partition、partition所在的主機地址和Executor id val tasks: Seq[Task[_]] = try { stage match { case stage: ShuffleMapStage => partitionsToCompute.map { id => /* * 獲取task所在的節點,資料所在的節點優先啟動任務處理這些資料,在這裡用到ShuffleMapStage. * */ val locs = getPreferredLocs(stage.rdd, id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, taskBinary, part, locs)//taskBinary是廣播變數 } case stage: ResultStage => val job = stage.resultOfJob.get partitionsToCompute.map { id => val p: Int = job.partitions(id) val part = stage.rdd.partitions(p) val locs = getPreferredLocs(stage.rdd, p) new ResultTask(stage.id, taskBinary, part, locs, id) } } } catch { case NonFatal(e) => abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}") runningStages -= stage return } if (tasks.size > 0) { logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") stage.pendingTasks ++= tasks logDebug("New pending tasks: " + stage.pendingTasks) taskScheduler.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.firstJobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis())//設定StageInfo的submissionTime成員,表示這個TaskSet會被執行,不會被skipped } else
Job的Stage沒有分解成TaskSet提交執行,則這個Stage和它對應的Task會標記為skipped stage和skipped task進行統計顯示。
那種Stage不會分解成TaskSet分解執行呢?
Spark在提交Job的時候,會發送JobSubmitted事件,DAGScheduler.doOnReceive接收到JobSubmitted事件之後,會呼叫DAGScheduler.handleJobSubmitted方法處理任務提交。
DAGScheduler.handleJobSubmitted首先呼叫DAGScheduler.newResultStage方法建立最後一個Stage,DAGScheduler.newResultStage通過以下一系列函式呼叫最終會呼叫到DAGScheduler.registerShuffleDependencies,這個方法將這個RDD所有的祖宗Stage加入到DAGScheduler.jobIdToStageIds這個HashMap中。然後獲取這個Job的每個Stage對應的StageInfo,轉換成一個Seq,傳送SparkListenerJobStart事件。
DAGScheduler.newResultStage->
DAGScheduler.getParentStagesAndId->
DAGScheduler.getParentStagesAndId->getParentStages
DAGScheduler.getParentStagesAndId->getShuffleMapStage
DAGScheduler.registerShuffleDependencies
DAGScheduler.registerShuffleDependencies首先呼叫DAGScheduler.getAncestorShuffleDependencies找到當前rdd所有祖宗的rdd依賴,包括父輩、爺爺輩,以致更高輩分的rdd依賴,然後呼叫DAGScheduler.newOrUsedShuffleStage建立每個祖宗rdd依賴對應的ShuffleMapStage,
private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) {
val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)//獲取所有祖宗rdd依賴,包括父輩、爺爺輩等
while (parentsWithNoMapStage.nonEmpty) {
val currentShufDep = parentsWithNoMapStage.pop()
//根據ShuffleDependency和jobid生成Stage,由於是從棧裡面彈出,所以最先新增的是Root stage,依次類推,最先新增的Stage shuffleId越小
val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
shuffleToMapStage(currentShufDep.shuffleId) = stage
}
}
private def newOrUsedShuffleStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.size
val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)//建立stage
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
for (i <- 0 until locs.size) {
stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
}
stage.numAvailableOutputs = locs.count(_ != null)
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
}
stage
}
DAGScheduler.newOrUsedShuffleStage會呼叫DAGScheduler.newShuffleMapStage建立stage。
DAGScheduler.newShuffleMapStage方法建立了stage之後,呼叫DAGScheduler.updateJobIdStageIdMaps方法將新建立的stage.id加入到DAGScheduler.jobIdToStageIds中。原始碼如下:
private def updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit = {
def updateJobIdStageIdMapsList(stages: List[Stage]) {
if (stages.nonEmpty) {
val s = stages.head
s.jobIds += jobId
jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id//將stage id加入到jobIdToStageIds中
val parents: List[Stage] = getParentStages(s.rdd, jobId)
val parentsWithoutThisJobId = parents.filter { ! _.jobIds.contains(jobId) }
updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
}
}
updateJobIdStageIdMapsList(List(stage))
}
DAGScheduler.handleJobSubmitted原始碼如下:
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite)//建立ResultStage,在這個方法裡面會將這個Job執行過程中,需要可能經歷的Stage全部放入到
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
if (finalStage != null) {
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
job.jobId, callSite.shortForm, partitions.length, allowLocal))
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val shouldRunLocally =
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
val jobSubmissionTime = clock.getTimeMillis()
if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
runLocally(job)
} else {
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.resultOfJob = Some(job)
val stageIds = jobIdToStageIds(jobId).toArray//獲取一個Job對應的所有的Stage id,Job的所有Stage在執行newResultStage的時候會建立,所以在這裡能獲取成功
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))//獲取每個Stage對應的StageInfo
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))//傳送Job啟動事件SparkListenerJobStart
submitStage(finalStage)
}
}
submitWaitingStages()
}
JobProgressListener.onJobStart負責接收處理SparkListenerJobStart事件。它會把DAGScheduler.handleJobSubmitted方法建立的所有StageInfo資訊放到JobProgressListener.stageIdToInfo這個HashMap中。
至此可以得出結論:JobProgressListener.onJobEnd方法中,處理的obProgressListener.stageIdToInfo資訊是執行DAGScheduler.handleJobSubmitted產生的。在Job對應的所有Stage分解成Task之前就已經產生了。
根據本人的
文章可以知道,在將Stage分解成TaskSet的時候,如果一個RDD已經Cache到了BlockManager,則這個RDD對應的所有祖宗Stage都不會分解成TaskSet進行執行,所以這些祖宗Stage對應的StageInfo.submissionTime.isEmpty就會返回true,所以這些祖宗Stage和它們對應的Task就會在Spark ui上顯示為skipped
Stage執行完成之後,會執行JobProgressListener.onStageCompleted將Stage資訊儲存到JobProgressListener.stageIdToInfo,原始碼如下:
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
val stage = stageCompleted.stageInfo
stageIdToInfo(stage.stageId) = stage//儲存Stage的資訊,便於跟蹤顯示
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), {
logWarning("Stage completed for unknown stage " + stage.stageId)
new StageUIData
})
Stage對應的TaskSet中所有任務成功執行後,會將Stage對應的StageInfo反饋到JobProgressListener.stageIdToInfo,這樣這些任務就不會顯示為skipped了