1. 程式人生 > >Spark:Task原理剖析與原始碼分析

Spark:Task原理剖析與原始碼分析

在Spark中,一個應用程式要想被執行,肯定要經過以下的步驟:
在這裡插入圖片描述
從這個路線得知,最終一個job是依賴於分佈在叢集不同節點中的task,通過並行或者併發的執行來完成真正的工作。由此可見,一個個的分散式的task才是Spark的真正執行者。下面先來張task執行框架整體的對Spark的task執行有個大概的瞭解。
在這裡插入圖片描述

task執行之前的工作是Worker啟動Executor,接著Executor準備好一切執行環境,並向Driver反向註冊,最終Driver向Executor傳送LunchTask事件訊息,從Executor接受到LanchTask那一刻起,task就一發不可收拾了,開始通過java執行緒來進行以後的工作。當然了,在task正式工作之前,還有一些工作,比如根據stage演算法劃分好stage,根據task最佳位置計算演算法尋找到task的最佳位置(第一期盼都是希望能夠在同一個節點的同一個程序中有task所需要的需要,第二才是同一節點的不同程序,第三才是同一機架的不同節點,第四才是不同機架)。這樣做的目的是減少網路通訊的開銷,節省CPU資源,提高系統性能。

task以下幾點:

  1. 通過網路拉取執行所需的資源,並反序列化(由於多個task執行在多個Executor中,都是並行執行的,或者併發執行的,一個stage的task,處理的RDD是一樣的,這是通過廣播變數來完成的)
  2. 獲取shuffleManager,從shuffleManager中獲取shuffleWriter(shuffleWriter用於後面的資料處理並把返回的資料結果寫入磁碟)
  3. 呼叫rdd.iterator(),並傳入當前task要處理的partition(針對RDD的某個partition執行自定義的運算元或邏輯函式,返回的資料都是通過上面生成的ShuffleWriter,經過HashPartitioner[預設是這個]分割槽之後寫入對應的分割槽backet,其實就是寫入磁碟檔案中)
  4. 封裝資料結果為MapStatus ,傳送給MapOutputTracker,供ResultTask拉取。(MapStatus裡面封裝了ShuffleMaptask計算後的資料和儲存位置地址等資料資訊。其實也就是BlockManager相關資訊,BlockManager 是Spark底層的記憶體,資料,磁碟資料管理的元件)
  5. ResultTask拉取ShuffleMapTask的結果資料(經過2/3/4步驟之後的結果)

實現這個過程,task有ShuffleMapTask和ResultTask兩個子類task來支撐,前者是用於通過各種map運算元和自定義函式轉換RDD。後者主要是觸發了action操作,把map階段後的新的RDD拉取過去,再執行我們自定義的函式體,實現各種業務功能。

原始碼分析:
第一步:接收Driver端發來的訊息
原始碼地址:org.apache.spark.executor.Executor.scala

  def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
    // 對於每一個task都需要建立一個taskRunner 【執行緒】
    // TaskRunner實際上是繼承Java的Runnable介面
    val tr = new TaskRunner(context, taskDescription)
    // 將TaskRunner放入記憶體快取中,runningTasks維護執行任務列表。
    runningTasks.put(taskDescription.taskId, tr)
    //將task封裝在一個執行緒中(TaskRunner),將執行緒丟入執行緒池中,然後執行
    // 執行緒池是實現排隊機制的,如果執行緒池內的執行緒暫時沒有空閒,放入的執行緒就會排隊
    threadPool.execute(tr)
  }

第二步:TaskRunner執行Task

  /**
   * task執行的工作原理
   */
  class TaskRunner(
    execBackend: ExecutorBackend,
    private val taskDescription: TaskDescription)
    extends Runnable {

......

    /**
     * 執行緒 執行run方法
     */
    override def run(): Unit = {
      threadId = Thread.currentThread.getId
      Thread.currentThread.setName(threadName)
      val threadMXBean = ManagementFactory.getThreadMXBean
      //為我們的Task建立記憶體管理器
      val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
      //記錄反序列化時間
      val deserializeStartTime = System.currentTimeMillis()
      val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
        threadMXBean.getCurrentThreadCpuTime
      } else 0L
      //載入具體類時需要用到ClassLoader
      Thread.currentThread.setContextClassLoader(replClassLoader)
      //建立序列化器
      val ser = env.closureSerializer.newInstance()
      logInfo(s"Running $taskName (TID $taskId)")
     //呼叫ExecutorBackend#statusUpdate向Driver發信息彙報當前狀態
      execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
      //記錄執行時間和GC資訊
      var taskStartTime: Long = 0
      var taskStartCpu: Long = 0
      startGCTime = computeTotalGcTime()

      try {
        // Must be set before updateDependencies() is called, in case fetching dependencies
        // requires access to properties contained within (e.g. for access control).
        Executor.taskDeserializationProps.set(taskDescription.properties)
        //通過網路通訊,將需要的檔案  資源 jar 拷貝
        updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)
        //反序列化Task
        //這裡用到了java的ClassLoader,因為java的ClassLoader可以幹很多事情,比如,用反射的方式來動態載入一個類,建立這個類的物件,
        //可以用於對指定上下文的相關資源,進行載入和讀取
        task = ser.deserialize[Task[Any]](
          taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
        task.localProperties = taskDescription.properties
        task.setTaskMemoryManager(taskMemoryManager)

        // If this task has been killed before we deserialized it, let's quit now. Otherwise,
        // continue executing the task.
        val killReason = reasonIfKilled
        if (killReason.isDefined) {
          // Throw an exception rather than returning, because returning within a try{} block
          // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
          // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
          // for the task.
          throw new TaskKilledException(killReason.get)
        }

        // The purpose of updating the epoch here is to invalidate executor map output status cache
        // in case FetchFailures have occurred. In local mode `env.mapOutputTracker` will be
        // MapOutputTrackerMaster and its cache invalidation is not based on epoch numbers so
        // we don't need to make any special calls here.
        if (!isLocal) {
          logDebug("Task " + taskId + "'s epoch is " + task.epoch)
          env.mapOutputTracker.asInstanceOf[MapOutputTrackerWorker].updateEpoch(task.epoch)
        }

        // Run the actual task and measure its runtime.
        // 計算出task開始的時間
        taskStartTime = System.currentTimeMillis()
        taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime
        } else 0L
        var threwException = true
        // 執行task,用的是task的run()方法
        /**
         * 這裡的value,對於ShuffleMapTask來說,其實就是MapStatus,封裝了ShuffleMapTask計算的資料,輸出的位置
         * 後面還是一個ShuffleMapTask,那麼就會去聯絡MapOutputTracker,來獲取上一個ShuffleMapTasks的輸出位置,然後通過網路拉取資料
         * ResultTask,也是一樣的
         */
        val value = Utils.tryWithSafeFinally {
          val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = taskDescription.attemptNumber,
            metricsSystem = env.metricsSystem)
          threwException = false
          res
        } {
          //清理所有分配的記憶體和分頁,並檢測是否有記憶體洩漏
          val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
          val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()

          if (freedMemory > 0 && !threwException) {
            val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
            if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
              throw new SparkException(errMsg)
            } else {
              logWarning(errMsg)
            }
          }

          if (releasedLocks.nonEmpty && !threwException) {
            val errMsg =
              s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
                releasedLocks.mkString("[", ", ", "]")
            if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false)) {
              throw new SparkException(errMsg)
            } else {
              logInfo(errMsg)
            }
          }
        }
        task.context.fetchFailed.foreach { fetchFailure =>
          // uh-oh.  it appears the user code has caught the fetch-failure without throwing any
          // other exceptions.  Its *possible* this is what the user meant to do (though highly
          // unlikely).  So we will log an error and keep going.
          logError(s"TID ${taskId} completed successfully though internally it encountered " +
            s"unrecoverable fetch failures!  Most likely this means user code is incorrectly " +
            s"swallowing Spark's internal ${classOf[FetchFailedException]}", fetchFailure)
        }
        // 計算出task結束的時間
        val taskFinish = System.currentTimeMillis()
        val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime
        } else 0L

        // If the task has been killed, let's fail it.
        task.context.killTaskIfInterrupted()
        
        // 這個,其實就是針對MapStatus進行了各種序列化和封裝,因為後面要傳送給Driver(通過網路)
        val resultSer = env.serializer.newInstance()
        val beforeSerialization = System.currentTimeMillis()
        val valueBytes = resultSer.serialize(value)
        val afterSerialization = System.currentTimeMillis()

        // Deserialization happens in two parts: first, we deserialize a Task object, which
        // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
        /**
         * 這裡計算task相關的統計資訊,包括 反序列化耗時長、Java虛擬機器GC耗時長、資料結果序列化耗時長
         * 這些指標都會在SparkUI上顯示
         */
        task.metrics.setExecutorDeserializeTime(
          (taskStartTime - deserializeStartTime) + task.executorDeserializeTime)
        task.metrics.setExecutorDeserializeCpuTime(
          (taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)
        // We need to subtract Task.run()'s deserialization time to avoid double-counting
        task.metrics.setExecutorRunTime((taskFinish - taskStartTime) - task.executorDeserializeTime)
        task.metrics.setExecutorCpuTime(
          (taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)
        task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
        task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)

   ......

        // directSend = sending directly back to the driver
        val serializedResult: ByteBuffer = {
          //對直接返回的結果物件大小進行判斷
          if (maxResultSize > 0 && resultSize > maxResultSize) {
            //大於最大限制1G,直接丟棄ResultTask
            logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
              s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
              s"dropping it.")
            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
          } else if (resultSize > maxDirectResultSize) {
            //結果大小大於設定的閥值,則放入BlockManager中
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes(
              blockId,
              new ChunkedByteBuffer(serializedDirectResult.duplicate()),
              StorageLevel.MEMORY_AND_DISK_SER)
            logInfo(
              s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
            //返回非直接返回給Driver的物件TaskResultTask
              ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
          } else {
            //結果不大,直接傳回給Driver
            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
            serializedDirectResult
          }
        }

        setTaskFinishedAndClearInterruptStatus()
        // 呼叫了executor所在的CoarseGrainedExecutorBackend的statusUpdate()方法
        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

      } catch {
      
       ......
      
      }
      } finally {
        runningTasks.remove(taskId)
      }
    }

第三步:updateDependencies()方法

  private def updateDependencies(newFiles: Map[String, Long], newJars: Map[String, Long]) {
    //獲取Hadoop配置檔案
    lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
    //多執行緒併發訪問同步
    /**
     * 這裡,使用java的synchronized進行了多執行緒併發訪問的同步
     * 因為task實際上是以java執行緒的方式,在一個CoarseGrainedExecutorBackend程序內併發執行的
     * 如果在執行業務邏輯的時候,要訪問一些共享的資源,那麼就可能會出現多執行緒併發訪問安全問題
     * 所以,spark在這裡選擇進行了多執行緒併發訪問的同步(synchronized),因為在這裡面訪問了諸如currentFiles等等這些共享資源
     */
    synchronized {
      // Fetch missing dependencies
      //遍歷要拉取的檔案
      for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
        logInfo("Fetching " + name + " with timestamp " + timestamp)
        // Fetch file with useCache mode, close cache for local mode.
         //通過網路通訊,遠端拉取檔案
        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
          env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
        currentFiles(name) = timestamp
      }
      //遍歷拉取的jar
      for ((name, timestamp) <- newJars) {
        //判斷時間戳,要求jar當前時間戳小於目標時間戳
        val localName = new URI(name).getPath.split("/").last
        val currentTimeStamp = currentJars.get(name)
          .orElse(currentJars.get(localName))
          .getOrElse(-1L)
        if (currentTimeStamp < timestamp) {
          logInfo("Fetching " + name + " with timestamp " + timestamp)
          // Fetch file with useCache mode, close cache for local mode.
          //拉取jar檔案
          Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
            env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
          currentJars(name) = timestamp
          // Add it to our class loader
          val url = new File(SparkFiles.getRootDirectory(), localName).toURI.toURL
          if (!urlClassLoader.getURLs().contains(url)) {
            logInfo("Adding " + url + " to class loader")
            urlClassLoader.addURL(url)
          }
        }
      }
    }
  }

第四步:task的run()方法

  final def run(
      taskAttemptId: Long,
      attemptNumber: Int,
      metricsSystem: MetricsSystem): T = {
    SparkEnv.get.blockManager.registerTask(taskAttemptId)
    // TODO SPARK-24874 Allow create BarrierTaskContext based on partitions, instead of whether
    // the stage is barrier.
    /**
     * 建立一個TaskContext,就是task的執行上下文,裡面記錄了task執行的一些全域性性的資料,比如task重試了幾次
     * 比如task屬於哪個stage,task要處理的是rdd的哪個partition等等
     */
    val taskContext = new TaskContextImpl(
      stageId,
      stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal
      partitionId,
      taskAttemptId,
      attemptNumber,
      taskMemoryManager,
      localProperties,
      metricsSystem,
      metrics)

    context = if (isBarrier) {
      new BarrierTaskContext(taskContext)
    } else {
      taskContext
    }

    TaskContext.setTaskContext(context)
    taskThread = Thread.currentThread()

    if (_reasonIfKilled != null) {
      kill(interruptThread = false, _reasonIfKilled)
    }

    new CallerContext(
      "TASK",
      SparkEnv.get.conf.get(APP_CALLER_CONTEXT),
      appId,
      appAttemptId,
      jobId,
      Option(stageId),
      Option(stageAttemptId),
      Option(taskAttemptId),
      Option(attemptNumber)).setCurrentContext()

    try {
       // 呼叫抽象方法,runTask()
      runTask(context)
    } catch {
      case e: Throwable =>
        // Catch all errors; run task failure callbacks, and rethrow the exception.
        try {
          context.markTaskFailed(e)
        } catch {
          case t: Throwable =>
            e.addSuppressed(t)
        }
        context.markTaskCompleted(Some(e))
        throw e
    } finally {
      try {
        // Call the task completion callbacks. If "markTaskCompleted" is called twice, the second
        // one is no-op.
        context.markTaskCompleted(None)
      } finally {
        try {
          Utils.tryLogNonFatalError {
            // Release memory used by this thread for unrolling blocks
            SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
            SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(
              MemoryMode.OFF_HEAP)
            // Notify any tasks waiting for execution memory to be freed to wake up and try to
            // acquire memory again. This makes impossible the scenario where a task sleeps forever
            // because there are no other tasks left to notify it. Since this is safe to do but may
            // not be strictly necessary, we should revisit whether we can remove this in the
            // future.
            val memoryManager = SparkEnv.get.memoryManager
            memoryManager.synchronized { memoryManager.notifyAll() }
          }
        } finally {
          // Though we unset the ThreadLocal here, the context member variable itself is still
          // queried directly in the TaskRunner to check for FetchFailedExceptions.
          TaskContext.unset()
        }
      }
    }
  }

第五步: runTask(context)方法

  /**
   *  task是抽象方法,意味著這個類只是模板類,僅僅封裝了一些子類通用的屬性和方法,依賴於子類實現它們,來確定具體的功能
   *  前面說過task的有兩個子類ShuffleMapTask和ResultTask。有了它們,才能執行定義的運算元和邏輯
   */
  def runTask(context: TaskContext): T

第六步:ShuffleMapTask子類的runTask方法
原始碼地址:org.apache.spark.scheduler.ShuffleMapTask.scala

/**
 * ShuffleMapTask將rdd的元素,切分為多個bucket
 * 基於ShuffleDependency指定的partitioner,預設就是HashPartitioner
 */
private[spark] class ShuffleMapTask(
    stageId: Int,
    stageAttemptId: Int,
    taskBinary: Broadcast[Array[Byte]],
    partition: Partition,
    @transient private var locs: Seq[TaskLocation],
    localProperties: Properties,
    serializedTaskMetrics: Array[Byte],
    jobId: Option[Int] = None,
    appId: Option[String] = None,
    appAttemptId: Option[String] = None,
    isBarrier: Boolean = false)
  extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties,
    serializedTaskMetrics, jobId, appId, appAttemptId, isBarrier)
  with Logging {

......

  /**
   * ShuffleMapTask的 runTask 有 MapStatus返回值
   */
  override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    //對task要處理的資料,做反序列化操作
    /**
     * 多個task在executor中併發執行,資料可能都不在一臺機器上,一個stage處理的rdd都是一樣的task怎麼拿到自己要處理的資料的?
     * 答案:通過broadcast value  廣播變數獲取
     */
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    var writer: ShuffleWriter[Any, Any] = null
    try {
       // 獲取ShuffleManager
      val manager = SparkEnv.get.shuffleManager
      // 從ShuffleManager中獲取ShuffleWriter
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      /**
       * 首先呼叫了,rdd的iterator()方法,並且傳入了,當前task要處理哪個partition
       * 所以核心的邏輯,就在rdd的iterator()方法中,在這裡,實現了針對rdd的某個partition,執行我們自己定義的運算元,或者是函式
       
       * 執行完了我們自己定義的運算元、或者函式,就相當於是,針對rdd的partition執行了處理,會有返回的資料
       * 返回的資料,都是通過ShuffleWriter,經過HashPartitioner進行分割槽之後,寫入自己對應的分割槽bucket
       */
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      /**
       * 最後,返回結果MapStatus,MapStatus裡面封裝了ShuffleMapTask計算後的資料,資料儲存在哪裡,其實就是BlockManager的相關資訊
       * BlockManager是Spark底層的記憶體,資料,磁碟資料管理的元件
       */
      writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }
......

}

第七步:rdd.iterator方法

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
    // cacheManager相關東西
      getOrCompute(split, context)
    } else {
      // 進行rdd partition的計算
      computeOrReadCheckpoint(split, context)
    }
  }

第八步:computeOrReadCheckpoint()方法

  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    if (isCheckpointedAndMaterialized) {
      firstParent[T].iterator(split, context)
    } else {
      //抽象方法,找具體實現類,比如MapPartitionsRDD
      compute(split, context)
    }
  }

第九步:MapPartitionsRDD類中的compute(split, context)方法
原始碼地址:org.apache.spark.rdd.MapPartitionsRDD.scala

  /**
   * 這裡,就是針對rdd中的某個partition執行我們給這個rdd定義的運算元和函式
   * 這裡的f,可以理解為我們自己定義的運算元和函式,但是是Spark內部進行了封裝的,還實現了一些其他的邏輯
   * 執行到了這裡,就是在針對RDD的partition,執行自定義的計算操作,並返回新的rdd的partition資料
   */
  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

第十步:ResultTask的runTask()方法
原始碼地址: org.apache.spark.scheduler.ResultTask.scala

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    // 進行了基本的反序列化
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    // 執行通過rdd的iterator,執行我們定義的運算元和函式
    func(context, rdd.iterator(partition, context))
  }

第十一步:第二步中execBackend.statusUpdate方法給Driver發信息彙報自己的狀態。告訴Driver,Task已經完成了

// 呼叫了executor所在的CoarseGrainedExecutorBackend的statusUpdate()方法
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

第十二步:CoarseGrainedExecutorBackend的statusUpdate()方法

// 傳送StatusUpdate訊息給CoarseGrainedSchedulerBackend(Driver)
  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    val msg = StatusUpdate(executorId, taskId, state, data)
    driver match {
      case Some(driverRef) => driverRef.send(msg)
      case None => logWarning(s"Drop $msg because has not yet connected to driver")
    }
  }

第十三步:CoarseGrainedSchedulerBackend的StatusUpdate訊息
原始碼地址:org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.scala

      case StatusUpdate(executorId, taskId, state, data) =>
        // 呼叫TaskSchedulerImpl#statusUpdate進行更新
        scheduler.statusUpdate(taskId, state, data.value)
        // 如果Task處於完成狀態
        if (TaskState.isFinished(state)) {
          // 通過executor id獲取ExecutorData
          executorDataMap.get(executorId) match {
            // 如果存在資料
            case Some(executorInfo) =>
              // 則更新executor的cpu核數
              executorInfo.freeCores += scheduler.CPUS_PER_TASK
              // 獲取叢集中可用的executor列表,發起task
              makeOffers(executorId)
            case None =>
              // Ignoring the update since we don't know about the executor.
              logWarning(s"Ignored task status update ($taskId state $state) " +
                s"from unknown executor with ID $executorId")
          }
        }

第十四步:scheduler.statusUpdate方法

  def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
    var failedExecutor: Option[String] = None
    var reason: Option[ExecutorLossReason] = None
    synchronized {
      try {
        Option(taskIdToTaskSetManager.get(tid)) match {
          case Some(taskSet) =>
            // 判斷如果task是lost了,實際上,可能會經常發現task lost了,這就是因為各種各樣的原因,執行失敗了
            if (state == TaskState.LOST) {
              // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
              // where each executor corresponds to a single task, so mark the executor as failed.
              // 移除Executor,將它加入失敗佇列
              val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
                "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
              if (executorIdToRunningTaskIds.contains(execId)) {
                reason = Some(
                  SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
                removeExecutor(execId, reason.get)
                failedExecutor = Some(execId)
              }
            }
            if (TaskState.isFinished(state)) {
              // 如果task結束了,從記憶體快取中移除
              cleanupTaskState(tid)
              taskSet.removeRunningTask(tid)
              // 如果正常結束,也做相應的處理
              if (state == TaskState.FINISHED) {
                taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
              } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
                taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
              }
            }
          case None =>
            logError(
              ("Ignoring update with state %s for TID %s because its task set is gone (this is " +
                "likely the result of receiving duplicate task finished status updates) or its " +
                "executor has been marked as failed.")
                .format(state, tid))
        }
      } catch {
        case e: Exception => logError("Exception in statusUpdate", e)
      }
    }
    // Update the DAGScheduler without holding a lock on this, since that can deadlock
    if (failedExecutor.isDefined) {
      assert(reason.isDefined)
      dagScheduler.executorLost(failedExecutor.get, reason.get)
      backend.reviveOffers()
    }
  }

總結一下,task的執行一開始不是直接呼叫底層的task的run方法直接處理job–>stage–>taskSet–>task這條路線的task任務的,它是通過分層和分工的思想來完成。task會派生出兩個子類ShuffleMapTask和ResultTask分別完成對應的工作,ShuffleMapTask主要是對task所擁有的的RDD的partition做對應的RDD轉換工作,ResultTask主要是根據action動作觸發,並拉取ShuffleMapTask階段的結果做進一步的運算元和邏輯函式對資料對真正進一步的處理。這兩個階段是通過MapOutputTracker來連線起來的。