1. 程式人生 > 其它 >Spark原始碼系列(三)作業執行過程

Spark原始碼系列(三)作業執行過程

作業執行

上一章講了RDD的轉換,但是沒講作業的執行,它和Driver Program的關係是啥,和RDD的關係是啥?

官方給的例子裡面,一執行collect方法就能出結果,那我們就從collect開始看吧,進入RDD,找到collect方法。

  def collect(): Array[T] = {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

它進行了兩個操作:

1、呼叫SparkContext的runJob方法,把自身的引用傳入去,再傳了一個匿名函式(把Iterator轉換成Array陣列)

2、把result結果合併成一個Array,注意results是一個Array[Array[T]]型別,所以第二句的那個寫法才會那麼奇怪。這個操作是很重的一個操作,如果結果很大的話,這個操作是會報OOM的,因為它是把結果儲存在Driver程式的記憶體當中的result數組裡面。

我們點進去runJob這個方法吧。

    val callSite = getCallSite
    val cleanedFunc = clean(func)
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get)
    rdd.doCheckpoint()

追蹤下去,我們會發現經過多個不同的runJob同名函式呼叫之後,執行job作業靠的是dagScheduler,最後把結果通過resultHandler儲存返回。

DAGScheduler如何劃分作業

好的,我們繼續看DAGScheduler的runJob方法,提交作業,然後等待結果,成功什麼都不做,失敗丟擲錯誤,我們接著看submitJob方法。

    val jobId = nextJobId.getAndIncrement()
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    // 記錄作業成功與失敗的資料結構,一個作業的Task數量是和分片的數量一致的,Task成功之後呼叫resultHandler儲存結果。
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)

走到這裡,感覺有點兒繞了,為什麼到了這裡,還不直接執行呢,還要給eventProcessActor傳送一個JobSubmitted請求呢,new一個執行緒和這個區別有多大?

不管了,搜尋一下eventProcessActor吧,結果發現它是一個DAGSchedulerEventProcessActor,它的定義也在DAGScheduler這個類裡面。它的receive方法裡面定義了12種事件的處理方法,這裡我們只需要看

JobSubmitted的就行,它也是呼叫了自身的handleJobSubmitted方法。但是這裡很奇怪,沒辦法打斷點除錯,但是它的結果倒是能返回的,因此我們得用另外一種方式,開啟test工程,找到scheduler目錄下的DAGSchedulerSuite這個類,我們自己寫一個test方法,首先我們要在import那裡加上import org.apache.spark.SparkContext._  ,然後加上這一段測試程式碼。

  test("run shuffle") {
    val rdd1 = sc.parallelize(1 to 100, 4)
    val rdd2 = rdd1.filter(_ % 2 == 0).map(_ + 1)
    val rdd3 = rdd2.map(_ - 1).filter(_ < 50).map(i => (i, i))
    val rdd4 = rdd3.reduceByKey(_ + _)
    submit(rdd4, Array(0,1,2,3))
    complete(taskSets(0), Seq(
      (Success, makeMapStatus("hostA", 1)),
      (Success, makeMapStatus("hostB", 1))))
    complete(taskSets(1), Seq((Success, 42)))
    complete(taskSets(2), Seq(
      (Success, makeMapStatus("hostA", 2)),
      (Success, makeMapStatus("hostB", 2))))
    complete(taskSets(3), Seq((Success, 68)))
  }

這個例子的重點還是shuffle那塊,另外也包括了map的多個轉換,大家可以按照這個例子去測試下。

我們接著看handleJobSubmitted吧。

    var finalStage: Stage = null
    try {
      finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
    } catch {
      // 錯誤處理,告訴監聽器作業失敗,返回....
    }
    if (finalStage != null) {
      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
      clearCacheLocs()
      if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
        // 很短、沒有父stage的本地操作,比如 first() or take() 的操作本地執行.
        listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
        runLocally(job)
      } else {
        // collect等操作走的是這個過程,更新相關的關係對映,用監聽器監聽,然後提交作業
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        resultStageToJob(finalStage) = job
        listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties))
        // 提交stage
        submitStage(finalStage)
      }
    }
    // 提交stage
    submitWaitingStages()

從上面這個方法來看,我們應該重點關注newStage方法、submitStage方法和submitWaitingStages方法。

我們先看newStage,它得到的結果叫做finalStage,挺奇怪的哈,為啥?先看吧

    val id = nextStageId.getAndIncrement()
    val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stageToInfos(stage) = StageInfo.fromStage(stage)
    stage

可以看出來Stage也沒有太多的東西可言,它就是把rdd給傳了進去,tasks的數量,shuffleDep是空,parentStage。

那它的parentStage是啥呢?

  private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
    val parents = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        // 在visit函式裡面,只有存在ShuffleDependency的,parent才通過getShuffleMapStage計算出來
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_,_] =>
              parents += getShuffleMapStage(shufDep, jobId)
            case _ =>
              visit(dep.rdd)
          }
        }
      }
    }
    visit(rdd)
    parents.toList
  }

它是通過不停的遍歷它之前的rdd,如果碰到有依賴是ShuffleDependency型別的,就通過getShuffleMapStage方法計算出來它的Stage來。

那我們就開始看submitStage方法吧。

  private def submitStage(stage: Stage) {
        //...
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing == Nil) {
          // 沒有父stage,執行這stage的tasks
          submitMissingTasks(stage, jobId.get)
          runningStages += stage
        } else {
         // 提交父stage的task,這裡是個遞迴,真正的提交在上面的註釋的地方
          for (parent <- missing) {
            submitStage(parent)
          }
          // 暫時不能提交的stage,先新增到等待佇列
          waitingStages += stage
        }
      }
  }

這個提交stage的過程是一個遞迴的過程,它是先要把父stage先提交,然後把自己新增到等待佇列中,直到沒有父stage之後,就提交該stage中的任務。等待佇列在最後的submitWaitingStages方法中提交。

這裡我引用一下上一章當中我所畫的那個圖來表示這個過程哈。

從getParentStages方法可以看出來,RDD當中存在ShuffleDependency的Stage才會有父Stage, 也就是圖中的虛線的位置!

所以我們只需要記住凡是涉及到shuffle的作業都會至少有兩個Stage,即shuffle前和shuffle後。

TaskScheduler提交Task

那我們接著看submitMissingTasks方法,下面是主體程式碼。

  private def submitMissingTasks(stage: Stage, jobId: Int) {
    val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
    myPending.clear()
    var tasks = ArrayBuffer[Task[_]]()
    if (stage.isShuffleMap) {
      // 這是shuffle stage的情況
      for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
        val locs = getPreferredLocs(stage.rdd, p)
        tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
      }
    } else {
      // 這是final stage的情況
      val job = resultStageToJob(stage)
      for (id <- 0 until job.numPartitions if !job.finished(id)) {
        val partition = job.partitions(id)
        val locs = getPreferredLocs(stage.rdd, partition)
        tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
      }
    }
    if (tasks.size > 0) {
      myPending ++= tasks
      taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
      stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
    } else {
      runningStages -= stage
    }
  }

Task也是有兩類的,一種是ShuffleMapTask,一種是ResultTask,我們需要注意這兩種Task的runTask方法。最後Task是通過taskScheduler.submitTasks來提交的。

我們找到TaskSchedulerImpl裡面看這個方法。

  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasksthis.synchronized {
      val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
      activeTaskSets(taskSet.id) = manager
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
      hasReceivedTask = true
    }
    backend.reviveOffers()
  }

排程器有兩種模式,FIFO和FAIR,預設是FIFO, 可以通過spark.scheduler.mode來設定,schedulableBuilder也有相應的兩種FIFOSchedulableBuilder和FairSchedulableBuilder。

那backend是啥?據說是為了給TaskSchedulerImpl提供外掛式的排程服務的。

它是怎麼例項化出來的,這裡我們需要追溯回到SparkContext的createTaskScheduler方法,下面我直接把常用的3中型別的TaskScheduler給列出來了。

mode            Scheduler                          Backend

cluster          TaskSchedulerImpl             SparkDeploySchedulerBackend

yarn-cluster  YarnClusterScheduler          CoarseGrainedSchedulerBackend

yarn-client    YarnClientClusterScheduler  YarnClientSchedulerBackend

好,我們回到之前的程式碼上,schedulableBuilder.addTaskSetManager比較簡單,把作業集新增到排程器的隊列當中。

我們接著看backend的reviveOffers,裡面只有一句話driverActor ! ReviveOffers。真是頭暈,搞那麼多Actor,只是為了接收訊息。。。

照舊吧,找到它的receive方法,找到ReviveOffers這個case,發現它呼叫了makeOffers方法,我們繼續追殺!

def makeOffers() {
    launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}

從executorHost中隨機抽出一些來給排程器,然後排程器返回TaskDescription,executorHost怎麼來的,待會兒再說,我們接著看resourceOffers方法。

def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    SparkEnv.set(sc.env)

    // 遍歷worker提供的資源,更新executor相關的對映
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
      }
    }

    // 從worker當中隨機選出一些來,防止任務都堆在一個機器上
    val shuffledOffers = Random.shuffle(offers)
    // worker的task列表
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    val sortedTaskSets = rootPool.getSortedTaskSetQueue

    // 隨機遍歷抽出來的worker,通過TaskSetManager的resourceOffer,把本地性最高的Task分給Worker
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
      do {
        launchedTask = false
        for (i <- 0 until shuffledOffers.size) {
          val execId = shuffledOffers(i).executorId
          val host = shuffledOffers(i).host
          if (availableCpus(i) >= CPUS_PER_TASK) {
            // 把本地性最高的Task分給Worker
            for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
              tasks(i) += task
              val tid = task.taskId
              taskIdToTaskSetId(tid) = taskSet.taskSet.id
              taskIdToExecutorId(tid) = execId
              activeExecutorIds += execId
              executorsByHost(host) += execId
              availableCpus(i) -= CPUS_PER_TASK
              assert (availableCpus(i) >= 0)
              launchedTask = true
            }
          }
        }
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

resourceOffers主要做了3件事:

1、從Workers裡面隨機抽出一些來執行任務。

2、通過TaskSetManager找出和Worker在一起的Task,最後編譯打包成TaskDescription返回。

3、將Worker-->Array[TaskDescription]的對映關係返回。

我們繼續看TaskSetManager的resourceOffer,看看它是怎麼找到和host再起的Task,並且包裝成TaskDescription。

通過檢視程式碼,我發現之前我解釋的和它具體實現的差別比較大,它所謂的本地性是根據當前的等待時間來確定的任務本地性的級別。

它的本地性主要是包括四類:PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY。

  private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
    while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&
        currentLocalityIndex < myLocalityLevels.length - 1)
    {
      // 成立條件是當前時間-上次釋出任務的時間 > 當前本地性級別的,條件成立就跳到下一個級別
      lastLaunchTime += localityWaits(currentLocalityIndex)
      currentLocalityIndex += 1
    }
    myLocalityLevels(currentLocalityIndex)
  }

等待時間是可以通過引數去設定的,具體的自己查下面的程式碼。

  private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
    val defaultWait = conf.get("spark.locality.wait", "3000")
    level match {
      case TaskLocality.PROCESS_LOCAL =>
        conf.get("spark.locality.wait.process", defaultWait).toLong
      case TaskLocality.NODE_LOCAL =>
        conf.get("spark.locality.wait.node", defaultWait).toLong
      case TaskLocality.RACK_LOCAL =>
        conf.get("spark.locality.wait.rack", defaultWait).toLong
      case TaskLocality.ANY =>
        0L
    }
  }

下面繼續看TaskSetManager的resourceOffer的方法,通過findTask來從Task集合裡面找到相應的Task。

      findTask(execId, host, allowedLocality) match {
        case Some((index, taskLocality)) => {
             val task = tasks(index)
             val serializedTask = Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
            val timeTaken = clock.getTime() - startTime
            addRunningTask(taskId)
            val taskName = "task %s:%d".format(taskSet.id, index)
            sched.dagScheduler.taskStarted(task, info)
            return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
        }

它的findTask方法如下:

  private def findTask(execId: String, host: String, locality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value)] =
  {
   // 同一個Executor,通過execId來查詢相應的等待的task
    for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL))
    }
   // 通過主機名找到相應的Task,不過比之前的多了一步判斷
    if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
      for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL))
      }
    }
  // 通過Rack的名稱查詢Task
    if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- findTaskFromList(execId, getPendingTasksForRack(rack))
      } {
        return Some((index, TaskLocality.RACK_LOCAL))
      }
    }
   // 查詢那些preferredLocations為空的,不指定在哪裡執行的Task來執行
    for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
      return Some((index, TaskLocality.PROCESS_LOCAL))
    }
  // 查詢那些preferredLocations為空的,不指定在哪裡執行的Task來執行
    if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
      for (index <- findTaskFromList(execId, allPendingTasks)) {
        return Some((index, TaskLocality.ANY))
      }
    }
    // 最後沒辦法了,拖的時間太長了,只能啟動推測執行了
    findSpeculativeTask(execId, host, locality)
  }

從這個方面可以看得出來,Spark對執行時間還是很注重的,等待的時間越長,它就可能越飢不擇食,從PROCESS_LOCAL一直讓步到ANY,最後的最後,推測執行都用到了。

找到任務之後,它就呼叫dagScheduler.taskStarted方法,通知dagScheduler任務開始了,taskStarted方法就不詳細講了,它觸發dagScheduler的BeginEvent事件,裡面只做了2件事:

1、檢查Task序列化的大小,超過100K就警告。

2、提交等待的Stage。

好,我們繼續回到釋出Task上面來,中間過程講完了,我們應該是要回到CoarseGrainedSchedulerBackend的launchTasks方法了。

def makeOffers() {
    launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}

它的方法體是:

    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
        executorActor(task.executorId) ! LaunchTask(task)
      }
    }

通過executorId找到相應的executorActor,然後傳送LaunchTask過去,一個Task佔用一個Cpu。

註冊Application

那這個executorActor是怎麼來的呢?找唄,最後發現它是在receive方法裡面接受到RegisterExecutor訊息的時候註冊的。通過搜尋,我們找到CoarseGrainedExecutorBackend這個類,在它的preStart方法裡面赫然找到了driver ! RegisterExecutor(executorId, hostPort, cores)  帶的這三個引數都是在初始化的時候傳入的,那是誰例項化的它呢,再逆向搜尋找到SparkDeploySchedulerBackend!之前的backend一直都是它,我們看reviveOffers是在它的父類CoarseGrainedSchedulerBackend裡面。

關係清楚了,在這個backend的start方法裡面啟動了一個AppClient,AppClient的其中一個引數ApplicationDescription就是封裝的執行CoarseGrainedExecutorBackend的命令。AppClient內部啟動了一個ClientActor,這個ClientActor啟動之後,會嘗試向Master傳送一個指令actor ! RegisterApplication(appDescription) 註冊一個Application。

別廢話了,Ctrl +Shift + N吧,定位到Master吧。

    case RegisterApplication(description) => {
        val app = createApplication(description, sender)
        registerApplication(app)
        persistenceEngine.addApplication(app)
        sender ! RegisteredApplication(app.id, masterUrl)
        schedule()
    }

它做了5件事:

1、createApplication為這個app構建一個描述App資料結構的ApplicationInfo。

2、註冊該Application,更新相應的對映關係,新增到等待佇列裡面。

3、用persistenceEngine持久化Application資訊,預設是不儲存的,另外還有兩種方式,儲存在檔案或者Zookeeper當中。

4、通過傳送方註冊成功。

5、開始作業排程。

關於排程的問題,在第一章《spark-submit提交作業過程》已經介紹過了,建議回去再看看,搞清楚Application和Executor之間的關係。

Application一旦獲得資源,Master會發送launchExecutor指令給Worker去啟動Executor。

進到Worker裡面搜尋LaunchExecutor。

  val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host,
            appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
  executors(appId + "/" + execId) = manager
  manager.start()
   coresUsed += cores_
   memoryUsed += memory_
   masterLock.synchronized {
      master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
   }

原來ExecutorRunner還不是傳說中的Executor,它內部是執行了appDesc內部的那個命令,啟動了CoarseGrainedExecutorBackend,它才是我們的真命天子Executor。

啟動之後ExecutorRunner報告ExecutorStateChanged事件給Master。

Master幹了兩件事:

1、轉發給Driver,這個Driver是之前註冊Application的那個AppClient

2、如果是Executor執行結束,從相應的對映關係裡面刪除

釋出Task

上面又花了那麼多時間講Task的執行環境ExecutorRunner是怎麼註冊,那我們還是回到我們的主題,Task的釋出。

釋出任務是傳送LaunchTask指令給CoarseGrainedExecutorBackend,接受到指令之後,讓它內部的executor來發布這個任務。

這裡我們看一下Executor的launchTask。

  def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
    val tr = new TaskRunner(context, taskId, serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

TaskRunner是這裡的重頭戲啊!看它的run方法吧。

    override def run() {
      // 準備工作若干...那天我們放學回家經過一片玉米地,以上省略一百字

      try {
        // 反序列化Task
        SparkEnv.set(env)
        Accumulators.clear()
        val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
        updateDependencies(taskFiles, taskJars)
        task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

       // 命令為嘗試執行,和hadoop的mapreduce作業是一致的 
        attemptedTask = Some(task)
        logDebug("Task " + taskId + "'s epoch is " + task.epoch)
        env.mapOutputTracker.updateEpoch(task.epoch)

        // 執行Task, 具體可以去看之前讓大家關注的ResultTask和ShuffleMapTask
        taskStart = System.currentTimeMillis()
        val value = task.run(taskId.toInt)
        val taskFinish = System.currentTimeMillis()

     // 對結果進行序列化
        val resultSer = SparkEnv.get.serializer.newInstance()
        val beforeSerialization = System.currentTimeMillis()
        val valueBytes = resultSer.serialize(value)
        val afterSerialization = System.currentTimeMillis()
     // 更新任務的相關監控資訊,會反映到監控頁面上的
        for (m <- task.metrics) {
          m.hostname = Utils.localHostName()
          m.executorDeserializeTime = taskStart - startTime
          m.executorRunTime = taskFinish - taskStart
          m.jvmGCTime = gcTime - startGCTime
          m.resultSerializationTime = afterSerialization - beforeSerialization
        }

        val accumUpdates = Accumulators.values
     // 對結果進行再包裝,包裝完再進行序列化
        val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
        val serializedDirectResult = ser.serialize(directResult)
        // 如果中間結果的大小超過了spark.akka.frameSize(預設是10M)的大小,就要提升序列化級別了,超過記憶體的部分要儲存到硬碟的
        val serializedResult = {
          if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes(blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
            ser.serialize(new IndirectTaskResult[Any](blockId))
          } else {
            serializedDirectResult
          }
        }
     // 返回結果
        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
      } catch {
        // 這部分是錯誤處理,被我省略掉了,主要內容是通關相關負責人處理後事
      } finally {
        // 清理為ResultTask註冊的shuffle記憶體,最後把task從正在執行的列表當中刪除
        val shuffleMemoryMap = env.shuffleMemoryMap
        shuffleMemoryMap.synchronized {
          shuffleMemoryMap.remove(Thread.currentThread().getId)
        }
        runningTasks.remove(taskId)
      }
    }
  }

以上程式碼被我這些了,但是建議大家看看註釋吧。

最後結果是通過statusUpdate返回的。

  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    driver ! StatusUpdate(executorId, taskId, state, data)
  }

這回這個Driver又不是剛才那個AppClient,而是它的家長SparkDeploySchedulerBackend,是在SparkDeploySchedulerBackend的父類CoarseGrainedSchedulerBackend接受了這個StatusUpdate訊息。

這關係真他娘夠亂的。。

繼續,Task裡面走的是TaskSchedulerImpl這個方法。

scheduler.statusUpdate(taskId, state, data.value)

到這裡,一個Task就執行結束了,後面就不再擴充套件了,作業執行這塊是Spark的核心,再擴充套件基本就能寫出來一本書了,限於文章篇幅,這裡就不再深究了。

以上的過程應該是和下面的圖一致的。

看完這篇文章,估計大家會雲裡霧裡的,在下一章《作業生命週期》會把剛才描述的整個過程重新梳理出來,便於大家記憶,敬請期待!