spark job提交7
阿新 • • 發佈:2018-11-23
當task在executor上執行時最終會在taskrunner中呼叫execBackend.statusUpdate來向driver端傳送狀態更新 \spark-master\core\src\main\scala\org\apache\spark\executor\CoarseGrainedExecutorBackend.scala 直接呼叫driverRef.Send函式來發送訊息 override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { val msg = StatusUpdate(executorId, taskId, state, data) driver match { case Some(driverRef) => driverRef.send(msg) case None => logWarning(s"Drop $msg because has not yet connected to driver") } } 根據訊息機制,send傳送的訊息會在receive中處理 spark-master\core\src\main\scala\org\apache\spark\scheduler\cluster\CoarseGrainedSchedulerBackend.scala override def receive: PartialFunction[Any, Unit] = { case StatusUpdate(executorId, taskId, state, data) => #呼叫TaskScheduler中的statusupdate方法 scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK makeOffers(executorId) case None => // Ignoring the update since we don't know about the executor. logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId") } } } spark-master\core\src\main\scala\org\apache\spark\scheduler\TaskSchedulerImpl.scala def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None var reason: Option[ExecutorLossReason] = None synchronized { try { taskIdToTaskSetManager.get(tid) match { case Some(taskSet) => if (state == TaskState.LOST) { // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, // where each executor corresponds to a single task, so mark the executor as failed. val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) if (executorIdToRunningTaskIds.contains(execId)) { reason = Some( SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) removeExecutor(execId, reason.get) failedExecutor = Some(execId) } } #task執行成功時處理,呼叫taskResultGatter處理 if (TaskState.isFinished(state)) { cleanupTaskState(tid) taskSet.removeRunningTask(tid) if (state == TaskState.FINISHED) { taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) } } spark-master\core\src\main\scala\org\apache\spark\scheduler\TaskResultGetter.scala def enqueueSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer): Unit = { getTaskResultExecutor.execute(new Runnable { override def run(): Unit = Utils.logUncaughtExceptions { try { #最終計算結果 val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match { case directResult: DirectTaskResult[_] => if (!taskSetManager.canFetchMoreResults(serializedData.limit())) { return } // deserialize "value" without holding any lock so that it won't block other threads. // We should call it here, so that when it's called again in // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value. directResult.value(taskResultSerializer.get()) (directResult, serializedData.limit()) #結果儲存在worker節點的blockmanager中 case IndirectTaskResult(blockId, size) => if (!taskSetManager.canFetchMoreResults(size)) { // dropped by executor if size is larger than maxResultSize sparkEnv.blockManager.master.removeBlock(blockId) return } logDebug("Fetching indirect task result for TID %s".format(tid)) scheduler.handleTaskGettingResult(taskSetManager, tid) #從遠端worker獲得結果 val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId) if (!serializedTaskResult.isDefined) { /* We won't be able to get the task result if the machine that ran the task failed * between when the task ended and when we tried to fetch the result, or if the * block manager had to flush the result. */ scheduler.handleFailedTask( taskSetManager, tid, TaskState.FINISHED, TaskResultLost) return } #反序列化獲取的結果 val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]]( serializedTaskResult.get.toByteBuffer) // force deserialization of referenced value deserializedResult.value(taskResultSerializer.get()) sparkEnv.blockManager.master.removeBlock(blockId) (deserializedResult, size) } #處理獲取到的結果 scheduler.handleSuccessfulTask(taskSetManager, tid, result) } catch { }) } spark-master\core\src\main\scala\org\apache\spark\scheduler\TaskSchedulerImpl.scala def handleSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, taskResult: DirectTaskResult[_]): Unit = synchronized { #呼叫tasksetmanager的方法處理成功的task taskSetManager.handleSuccessfulTask(tid, taskResult) } def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { #呼叫dagscheduler的taskend方法 sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) maybeFinishTaskSet() } spark-master\core\src\main\scala\org\apache\spark\scheduler\DAGScheduler.scala def taskEnded( task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Seq[AccumulatorV2[_, _]], taskInfo: TaskInfo): Unit = { #通過post方法將CompletionEvent放到事件佇列中,會被同一個類中的OnReceive方法處理 eventProcessLoop.post( CompletionEvent(task, reason, result, accumUpdates, taskInfo)) } override def onReceive(event: DAGSchedulerEvent): Unit = { val timerContext = timer.time() try { #呼叫doOnReceive繼續處理 doOnReceive(event) } finally { timerContext.stop() } } private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) #處理前面的CompletionEvent時間,可見這裡是呼叫dagScheduler.handleTaskCompletion 來處理 case completion: CompletionEvent => dagScheduler.handleTaskCompletion(completion) } private[scheduler] def handleTaskCompletion(event: CompletionEvent) { event.reason match { case Success => task match { #處理ResultTask case rt: ResultTask[_, _] => // Cast to ResultStage here because it's part of the ResultTask // TODO Refactor this out to a function that accepts a ResultStage val resultStage = stage.asInstanceOf[ResultStage] resultStage.activeJob match { case Some(job) => if (!job.finished(rt.outputId)) { job.finished(rt.outputId) = true job.numFinished += 1 // If the whole job has finished, remove it #判斷釋放所有的job 都已經處理完畢了 if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) listenerBus.post( SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) } // taskSucceeded runs some user code that might throw an exception. Make sure // we are resilient against that. try { #通過jobwaiter,job處理完畢 job.listener.taskSucceeded(rt.outputId, event.result) } catch { case e: Exception => // TODO: Perhaps we want to mark the resultStage as failed? job.listener.jobFailed(new SparkDriverExecutionException(e)) } } case None => logInfo("Ignoring result from " + rt + " because its job has finished") } #處理shuffleMaptask case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) if (stageIdToStage(task.stageId).latestInfo.attemptNumber == task.stageAttemptId) { // This task was for the currently running attempt of the stage. Since the task // completed successfully from the perspective of the TaskSetManager, mark it as // no longer pending (the TaskSetManager may consider the task complete even // when the output needs to be ignored because the task's epoch is too small below. // In this case, when pending partitions is empty, there will still be missing // output locations, which will cause the DAGScheduler to resubmit the stage below.) shuffleStage.pendingPartitions -= task.partitionId } }