1. 程式人生 > >Spark作業執行流程原始碼解析

Spark作業執行流程原始碼解析

目錄

  • 相關概念
  • 概述
  • 原始碼解析
    • 作業提交
    • 劃分&提交排程階段
    • 提交任務
    • 執行任務
    • 結果處理
  • Reference

本文梳理一下Spark作業執行的流程。

Spark作業和任務排程系統是其核心,通過內部RDD的依賴DAG,使得模組之間的呼叫和處理變得遊刃有餘。

相關概念

Job(作業):通過行動操作生成的一個或多個排程階段

Stage:根據依賴關係劃分的多個任務集,稱為排程階段,也叫做TaskSet(任務集)。劃分Stage是由DAGScheduler進行的,任務階段分為Shuffle Map Stage和Result Stage。

Task:是Spark執行計算的最小單位,會被分發到Executor中執行。

DAGScheduler:是面向排程階段的任務排程器,接收Spark應用提交的作業,根據依賴關係劃分stage,並提交給TaskScheduler。

TaskScheduler:是面向任務的 排程器,接收DAGScheduler劃分好的stage,傳送給Worker節點的Executor執行任務。

關於RDD相關知識、行動操作、寬窄依賴請參考Spark RDD基本概念、寬窄依賴、轉換行為操作

概述

Spark作業主要是根據我們編寫的業務處理程式碼,生成一系列相互依賴的排程階段,之後將排程階段中的任務提交Executor的執行的過程。

上圖是spark作業執行流程圖。主要分為四塊:

  • 構建DAG

    行動操作觸發提交作業,提交之後根據依賴關係構造DAG。

  • 劃分排程階段、提交排程階段

    DAGScheduler中根據寬依賴劃分排程階段(stage)。每個stage包含多個task,組成taskset提交給TaskScheduler執行

  • 通過叢集管理器啟動任務

    TaskScheduler收到DAGScheduler提交的任務集,以任務的形式一個個分發到Executor中進行執行。

  • Executor端執行任務,完成後儲存報告結果

    Executor接到任務後,扔到執行緒池中執行任務。任務完成後,報告結果給Driver。

原始碼解析

從以下的程式碼展開敘述:

def main(args: Array[String]): Unit = {
    val sc = new SparkContext("local", "word-count", new SparkConf())
    val words = Seq("hello spark", "hello scala", "hello java")
    val rdd = sc.makeRDD(words)
    rdd
    .flatMap(_.split(" "))
    .map((_, 1))
    .reduceByKey(_ + _)
    .sortByKey()
    .foreach(println(_))
}

這是一個簡單的WordCount案例。首先根據序列生成RDD,再經過一系列運算元呼叫計算word的個數,之後再進行排序,輸出結果。

作業提交

上面的程式碼中,flatMap、map、reduceByKey、sortByKey都是轉化運算元,不會觸發計算;foreach是行動運算元,會提交作業,觸發計算。

看看foreach的內部的實現:

def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    // 將當前rdd引用和我們編寫的函式傳給sc.runJob
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
// 以下runJob函式都是SparkContext內部的過載函式
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    // 新增分割槽資訊
    runJob(rdd, func, 0 until rdd.partitions.length)
}
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: Iterator[T] => U,
    partitions: Seq[Int]): Array[U] = {
    val cleanedFunc = clean(func)
    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int]): Array[U] = {
    // 建立一個數組來儲存結果
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
    results
}
// 多次呼叫runJob,之後將呼叫DAGScheduler的runJob提交作業
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    // 任務成功後的處理函式
    resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
        throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
        logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    // 呼叫DAGScheduler.runJob提交作業
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
}

foreach內部呼叫了SparkContext.runJob()提交作業,SparkContext內部反覆呼叫了幾次過載的runJob方法。

runJob最終的引數中有當前rdd的引用、處理邏輯函式、分割槽數等,之後呼叫DagScheduler.runJob()提交作業。

現在再來到DagScheduler.runJob(),看看內部呼叫:

def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
    val start = System.nanoTime
    // 提交作業
    // waiter是等待DAGScheduler作業完成的物件。
    // 任務完成後,它將結果傳遞給給定的處理函式
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {
        case scala.util.Success(_) =>
        case scala.util.Failure(exception) =>
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }
}
// 提交job,劃分排程階段
def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {
    // 檢查以確保我們沒有在不存在的分割槽上啟動任務。
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
        throw new IllegalArgumentException(
            "Attempting to access a non-existent partition: " + p + ". " +
            "Total number of partitions: " + maxPartitions)
    }
    // 為當前job獲取id
    val jobId = nextJobId.getAndIncrement()
    // 如果分割槽為0,返回一個空job
    if (partitions.size == 0) {
        return new JobWaiter[U](this, jobId, 0, resultHandler)
    }
    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    // 封裝waiter,用於在執行結束時,回撥處理結果
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    // eventProcessLoop是用於提交/接收DAG排程事件的事件環
    // 提交作業,告知DAGScheduler開始劃分排程階段。
    eventProcessLoop.post(JobSubmitted(
        jobId, rdd, func2, partitions.toArray, callSite, waiter,
        SerializationUtils.clone(properties)))
    waiter
}

內部呼叫了submitJob(),傳送提交作業的訊息到DAGScheduler的eventProcessLoop事件環中。

劃分&提交排程階段

eventProcessLoop是用於接收排程事件的排程環,對應的類是DAGSchedulerEventProcessLoop。

內部通過模式匹配接收訊息,作出相應處理。接收到提交作業的訊息後,呼叫dagScheduler.handleJobSubmitted()開始劃分排程階段、提交排程階段。

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    // 匹配提交作業的訊息
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
}

看看dagScheduler.handleJobSubmitted()內部:

private[scheduler] def handleJobSubmitted(jobId: Int,
                                          finalRDD: RDD[_],
                                          func: (TaskContext, Iterator[_]) => _,
                                          partitions: Array[Int],
                                          callSite: CallSite,
                                          listener: JobListener,
                                          properties: Properties) {
    var finalStage: ResultStage = null
    try {
        // 根據依賴關係建立ResultStage
        finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
        ...
    }
    // 提交作業,清除內部資料
    barrierJobIdToNumTasksCheckFailures.remove(jobId)
    // 通過jobId, finalStage建立job
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    val jobSubmissionTime = clock.getTimeMillis()
    // 將job存入jobId對映到job的map中
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
        SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    // 提交排程階段
    submitStage(finalStage)
}

handleJobSubmitted主要分為兩塊,一塊是根據依賴生成ResultStage,一塊是提交ResultStage。

生成ResultStage

先看一下生成ResultStage,也就是createResultStage方法。

private def createResultStage(
    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
    // 先獲取當前rdd的父排程階段
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
}

會首先獲取當前RDD的父階段,獲取後根據父階段,建立ResultStage。

這裡注意一下,這裡的rdd是ShuffledRDD的引用。因為我們foreach觸發計算的時候,將呼叫rdd的引用傳了進來,也就是sortByKey生成的ShuffledRDD的引用。

接著看getOrCreateParentStages()是怎麼獲取當前RDD的父階段的:

private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    // 獲取寬依賴,之後根據獲取的寬依賴,建立對應的ShuffleMapStage
    getShuffleDependencies(rdd).map { shuffleDep =>
        getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
}
// 獲取當前RDD的寬依賴
// 返回作為給定RDD的直接父級的shuffle依賴項
// 此函式將不會返回更遠的祖先。例如,如果C對B具有寬依賴性,而B對A具有寬依賴性
// A <-- B <-- C
// 用rdd C呼叫此函式只會返回B <-C依賴項。
private[scheduler] def getShuffleDependencies(
    rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new ArrayStack[RDD[_]]
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
        val toVisit = waitingForVisit.pop()
        if (!visited(toVisit)) {
            visited += toVisit
            toVisit.dependencies.foreach {
                case shuffleDep: ShuffleDependency[_, _, _] =>
                parents += shuffleDep
                case dependency =>
                waitingForVisit.push(dependency.rdd)
            }
        }
    }
    parents
}
// 如果shuffle map stage已在shuffleIdToMapStage中存在,則獲取
// 不存在的話,將建立shuffle map stage 
private def getOrCreateShuffleMapStage(
    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int): ShuffleMapStage = {
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
        case Some(stage) =>
            stage
        case None =>
            // 查詢尚未在shuffleToMapStage中註冊的祖先shuffle依賴項,
            // 併為它建立shuffle map stage
            getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
                if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
                    createShuffleMapStage(dep, firstJobId)
                }
            }
            // 為當前shuffle依賴建立shuffle map stage
            createShuffleMapStage(shuffleDep, firstJobId)
    }
}

getOrCreateParentStages中先呼叫getShuffleDependencies(),獲取當前RDD的寬依賴;獲取後,呼叫getOrCreateShuffleMapStage()為寬依賴建立stage(如果stage已存在就直接獲取)。

先說一下getShuffleDependencies方法,如程式碼註釋所說:返回作為給定RDD的直接父級的shuffle依賴項,不會返回整個DAG上所有的寬依賴。另外說一下,getShuffleDependencies這種寫法感覺極度舒適,之後還有一個方法也是這麼寫。

我們傳入的RDD是sortByKey生成的ShuffleRDD例項,呼叫getShuffleDependencies就會返回ShuffleDependency。


再說一下getOrCreateShuffleMapStage方法,它為返回的ShuffleDependency建立shuffle map stage。

它內部會在shuffleIdToMapStage中找當前ShuffleDependency是否存在stage,如果存在則返回,不存在則建立。

在建立之前,首先會呼叫getMissingAncestorShuffleDependencies()獲取當前依賴的所有祖先寬依賴,並判斷他們是否存在對應的排程階段,如果不存在則呼叫createShuffleMapStage()建立。確保所有祖先寬依賴都存在對應的排程階段後,呼叫createShuffleMapStage()為當前ShuffleDependency建立stage。

看看getMissingAncestorShuffleDependencies和createShuffleMapStage的實現:

// 查詢所有尚未在shuffleToMapStage中註冊的祖先shuffle依賴項
private def getMissingAncestorShuffleDependencies(
    rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
    val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new ArrayStack[RDD[_]]
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
        val toVisit = waitingForVisit.pop()
        if (!visited(toVisit)) {
            visited += toVisit
            // 獲取寬依賴
            getShuffleDependencies(toVisit).foreach { shuffleDep =>
                if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
                    ancestors.push(shuffleDep)
                    waitingForVisit.push(shuffleDep.rdd)
                } 
            }
        }
    }
    ancestors
}
// 為shuffle依賴建立shuffle map stage 
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
    val numTasks = rdd.partitions.length
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ShuffleMapStage(
        id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)

    stageIdToStage(id) = stage
    // 建立stage時會將stage放入shuffleId對映到stage的Map中
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    updateJobIdStageIdMaps(jobId, stage)
    if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
        mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
}

getMissingAncestorShuffleDependencies和getShuffleDependencies的實現方法類似,返回所有尚未在shuffleToMapStage中註冊的祖先shuffle依賴項。createShuffleMapStage為shuffle dependency建立shuffle map stage。


到此,getOrCreateParentStages的步驟就走完了,也就獲取到了當前rdd的父階段。

視線回到createResultStage方法中來:

val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)

將stageId、rdd、處理邏輯方法、分割槽、父排程階段等作為引數構造ResultStage。ResultStage就生成成功了。

提交ResultStage

在handleJobSubmitted方法中,呼叫submitStage()將生成的ResultStage提交。

看看submitStage內部:

// 提交階段,但首先遞迴提交所有丟失的父階段
private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
        logDebug("submitStage(" + stage + ")")
        // 如果當前階段不是在等待&不是在執行&沒有結束,開始執行
        if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
            val missing = getMissingParentStages(stage).sortBy(_.id)
            logDebug("missing: " + missing)
            if (missing.isEmpty) {
                logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
                submitMissingTasks(stage, jobId.get)
            } else {
                for (parent <- missing) {
                    submitStage(parent)
                }
                waitingStages += stage
            }
        }
    } else {
        abortStage(stage, "No active job for stage " + stage.id, None)
    }
}

submitStage先呼叫getMissingParentStages獲取所有丟失的父階段。

如果沒有丟失的父階段,才會呼叫submitMissingTasks()提交當前階段的任務集;如果存在丟失的父階段,則遞迴呼叫submitStage先提交父階段。

getMissingParentStages的實現方式和getShuffleDependencies也類似,這裡就不看了,它的作用就是獲取所有丟失的父階段。

再大致說一下submitMissingTasks()是怎麼提交任務的:

val tasks: Seq[Task[_]] = try {
    stage match {
        case stage: ShuffleMapStage =>
            stage.pendingPartitions.clear()
            partitionsToCompute.map { id =>
                val locs = taskIdToLocations(id)
                val part = partitions(id)
                stage.pendingPartitions += id
                // 建立shuffleMapTask
                new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
                                   taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
                                   Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
            }

        case stage: ResultStage =>
            partitionsToCompute.map { id =>
                val p: Int = stage.partitions(id)
                val part = partitions(p)
                val locs = taskIdToLocations(id)
                // 建立ResultTask
                new ResultTask(stage.id, stage.latestInfo.attemptNumber,
                               taskBinary, part, locs, id, properties, serializedTaskMetrics,
                               Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
                               stage.rdd.isBarrier())
            }
    }
}


if (tasks.size > 0) {
    // 呼叫taskScheduler.submitTasks()提交task
    taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
}

submitMissingTasks內部根據ShuffleMapStage和ResultStage分別生成ShuffleMapTask和ResultTask。

之後將task封裝為TaskSet,呼叫TaskScheduler.submitTasks()提交任務。

到這裡,劃分和提交排程階段已經走完了。接下來開始看提交任務的原始碼。

提交任務

上面呼叫了TaskScheduler.submitTasks()提交任務,TaskScheduler是特質,真正方法實現在類TaskSchedulerImpl中,我們看看內部實現:

override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    this.synchronized {
        // 為該TaskSet建立TaskSetManager,管理這個任務集的生命週期
        val manager = createTaskSetManager(taskSet, maxTaskFailures)
        val stage = taskSet.stageId
        val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
        stageTaskSets.foreach { case (_, ts) =>
            ts.isZombie = true
        }
        stageTaskSets(taskSet.stageAttemptId) = manager
        // 將該任務集的管理器加入到系統排程池中去,由系統統一排程
        schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

        if (!isLocal && !hasReceivedTask) {
            starvationTimer.scheduleAtFixedRate(new TimerTask() {
                override def run() {
                    if (!hasLaunchedTask) {
                        logWarning("Initial job has not accepted any resources; " +
                                   "check your cluster UI to ensure that workers are registered " +
                                   "and have sufficient resources")
                    } else {
                        this.cancel()
                    }
                }
            }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
        }
        hasReceivedTask = true
    }
    backend.reviveOffers()
}

會首先為每個TaskSet建立TaskSetManager用於管理整個TaskSet的生命週期,並呼叫schedulableBuilder.addTaskSetManager將任務集管理器新增到系統排程池中去,之後呼叫SchedulerBackend.reviveOffers()分配資源並執行

看一下SchedulerBackend的其中一個子類CoarseGrainedSchedulerBackend的實現:

override def reviveOffers() {
    // 向Driver傳送ReviveOffsers的訊息
    driverEndpoint.send(ReviveOffers)
}

內部會向Driver終端點發送ReviveOffers的訊息,分配資源並執行。

CoarseGrainedSchedulerBackend的例項就是代表Driver端的守護程序,其實也相當於自己發給自己。

接收到ReviveOffers的訊息後,會呼叫makeOffers()。

看看makeOffers()實現:

private def makeOffers() {
    val taskDescs = withLock {
        // 獲取叢集中可用的Executor列表
        val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
        val workOffers = activeExecutors.map {
            case (id, executorData) =>
            new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
                            Some(executorData.executorAddress.hostPort))
        }.toIndexedSeq
        // 分配執行資源
        scheduler.resourceOffers(workOffers)
    }
    if (!taskDescs.isEmpty) {
        // 提交任務
        launchTasks(taskDescs)
    }
}

makeOffers()內部會先獲取所有可用的Executor列表,然後呼叫TaskSchedulerImpl.resourceOffers()分配資源,分配資源完成後,呼叫launchTask()提交任務。

看看TaskSchedulerImpl.resourceOffers()的實現:

// 由叢集管理器呼叫以在slave上提供資源。
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    //將每個slave標記為活動並記住其主機名, 還跟蹤是否添加了新的Executor
    var newExecAvail = false
    for (o <- offers) {
        if (!hostToExecutors.contains(o.host)) {
            hostToExecutors(o.host) = new HashSet[String]()
        }
        if (!executorIdToRunningTaskIds.contains(o.executorId)) {
            hostToExecutors(o.host) += o.executorId
            executorAdded(o.executorId, o.host)
            executorIdToHost(o.executorId) = o.host
            executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
            newExecAvail = true
        }
        for (rack <- getRackForHost(o.host)) {
            hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
        }
    }
    // 移除黑名單中的節點
    blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
    val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
        offers.filter { offer =>
            !blacklistTracker.isNodeBlacklisted(offer.host) &&
            !blacklistTracker.isExecutorBlacklisted(offer.executorId)
        }
    }.getOrElse(offers)
    // 為任務隨機分配Executor,避免任務集中分配到Worker上
    val shuffledOffers = shuffleOffers(filteredOffers)
    // 儲存已分配好的任務
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum
    // 獲取按照排程策略排序好的TaskSetManager
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
        logDebug("parentName: %s, name: %s, runningTasks: %s".format(
            taskSet.parent.name, taskSet.name, taskSet.runningTasks))
        if (newExecAvail) {
            taskSet.executorAdded()
        }
    }

    // 為排好序的TaskSetManager列表進行分配資源。分配的原則是就近原則,按照順序為PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY
    for (taskSet <- sortedTaskSets) {
        if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
            ...
        } else {
            var launchedAnyTask = false
            val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
            for (currentMaxLocality <- taskSet.myLocalityLevels) {
                var launchedTaskAtCurrentMaxLocality = false
                do {
                    launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
                                                                                  currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)
                    launchedAnyTask |= launchedTaskAtCurrentMaxLocality
                } while (launchedTaskAtCurrentMaxLocality)
            }
            ... 
        }
    }

    if (tasks.size > 0) {
        hasLaunchedTask = true
    }
    return tasks
}

resourceOffers中按照排程策略、就近原則為Task分配資源,返回分配好資源的Task。

分配好資源後,呼叫launchTasks()提交任務。

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
    for (task <- tasks.flatten) {
        // 序列化任務
        val serializedTask = ser.serialize(task)
        if (serializedTask.limit >= maxRpcMessageSize) {
           ...
        }
        else {
            val executorData = executorDataMap(task.executorId)
            executorData.freeCores -= scheduler.CPUS_PER_TASK
            // 向Executor所在節點終端傳送LaunchTask的訊息
            executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
    }
}

launchTasks內部先將任務序列化,之後把任務一個個的傳送到對應的CoarseGrainedExecutorBackend進行執行。

至此任務就提交完成了,接下來看Executor是如何執行任務的。

執行任務

CoarseGrainedExecutorBackend接收到LaunchTask訊息後,會呼叫Executor.launchTask()執行任務。

override def receive: PartialFunction[Any, Unit] = {
    case LaunchTask(data) =>
    if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
    } else {
        val taskDesc = TaskDescription.decode(data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        // 呼叫Executor.launchTask執行任務
        executor.launchTask(this, taskDesc)
    }
}

看看Executor.launchTask的實現:

def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
    // 將Task封裝到TaskRunner中
    val tr = new TaskRunner(context, taskDescription)
    runningTasks.put(taskDescription.taskId, tr)
    // 將TaskRunner扔到執行緒池中進行執行
    threadPool.execute(tr)
}

launchTask中會將Task封裝到TaskRunner中,然後把TaskRunner扔到執行緒池中進行執行。

TaskRunner是一個執行緒類,看一下它run方法的操作:

override def run(): Unit = {
    threadId = Thread.currentThread.getId
    Thread.currentThread.setName(threadName)
    val threadMXBean = ManagementFactory.getThreadMXBean
    val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
        threadMXBean.getCurrentThreadCpuTime
    } else 0L
    Thread.currentThread.setContextClassLoader(replClassLoader)
    val ser = env.closureSerializer.newInstance()
    // 開始執行
    execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
    var taskStartTime: Long = 0
    var taskStartCpu: Long = 0
    startGCTime = computeTotalGcTime()

    try {
        // 反序列化任務
        task = ser.deserialize[Task[Any]](
            taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
        // value是返回結果
        val value = Utils.tryWithSafeFinally {
            // 呼叫Task.run執行Task,並獲取返回結果
            val res = task.run(
                taskAttemptId = taskId,
                attemptNumber = taskDescription.attemptNumber,
                metricsSystem = env.metricsSystem)
            threwException = false
            res
        } {
            val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
            val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()     
        }

        val resultSer = env.serializer.newInstance()
        val beforeSerialization = System.currentTimeMillis()
        val valueBytes = resultSer.serialize(value)
        val afterSerialization = System.currentTimeMillis()
        val directResult = new DirectTaskResult(valueBytes, accumUpdates)
        val serializedDirectResult = ser.serialize(directResult)
        val resultSize = serializedDirectResult.limit()

        // 執行結果的處理
        val serializedResult: ByteBuffer = {
            // 結果大於maxResultSize,直接丟棄;這個值通過spark.driver.maxResultSize進行設定
            if (maxResultSize > 0 && resultSize > maxResultSize) {
                ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
            
            }
            // 結果大於maxDirectResultSize,存放到BlockManager中,然後將BlockId傳送到Driver
            else if (resultSize > maxDirectResultSize) {
                val blockId = TaskResultBlockId(taskId)
                env.blockManager.putBytes(
                    blockId,
                    new ChunkedByteBuffer(serializedDirectResult.duplicate()),
                    StorageLevel.MEMORY_AND_DISK_SER)
                ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
            }
            // 直接將結果發到Driver
            else {
                serializedDirectResult
            }
        }
        // 任務執行完成,呼叫CoarseGrainedExecutorBackend.statusUpdate
        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
    } catch {
        ...
    } finally {
        runningTasks.remove(taskId)
    }
}

run方法中,會將任務反序列化,然後呼叫Task.run()執行Task;執行完成後獲取執行結果,根據結果的大小分情況處理,之後呼叫CoarseGrainedExecutorBackend.statusUpdate()向Driver彙報執行結果。


Task的run方法中,會呼叫runTask()執行任務。

Task是抽象類,沒有對runTask()進行實現。具體的實現是由ShuffleMapTask和ResultTask進行的。

先看看ShuffleMapTask的runTask的實現:

override def runTask(context: TaskContext): MapStatus = {
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
        threadMXBean.getCurrentThreadCpuTime
    } else 0L
    // 反序列化
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
        ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
        threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L
    
    var writer: ShuffleWriter[Any, Any] = null
    try {
        val manager = SparkEnv.get.shuffleManager
        writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
       // 執行計算,並將結果寫入本地系統的BlockManager中
        writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
        // 關閉writer,返回計算結果
        // 返回包含了資料的location和size元資料資訊的MapStatus資訊
        writer.stop(success = true).get
    } catch {
    }
}

ShuffleMapTask會將計算結果寫入到BlockManager中,最終會返回包含相關元資料資訊的MapStatus。MapStatus將成為下一階段獲取輸入資料時的依據。

再看看ResultTask的runTask的實現:

override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
        threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    // 反序列化
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
        ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
        threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L
    
    // 執行func進行計算
    func(context, rdd.iterator(partition, context))
}

ResultTask會直接執行封裝進來的func函式,返回計算結果。


執行完成後,呼叫CoarseGrainedExecutorBackend.statusUpdate()。statusUpdate方法中向Driver終端點發送StatusUpdate的訊息彙報任務執行結果。

結果處理

Driver接到StatusUpdate訊息後,呼叫TaskSchedulerImpl.statusUpdate()進行處理

override def receive: PartialFunction[Any, Unit] = {
    case StatusUpdate(executorId, taskId, state, data) =>
        // 呼叫statusUpdate處理
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
            executorDataMap.get(executorId) match {
                case Some(executorInfo) =>
                    executorInfo.freeCores += scheduler.CPUS_PER_TASK
                    makeOffers(executorId)
                case None =>
                    》。 
            }
        }
}

看看statusUpdate方法:

def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
    var failedExecutor: Option[String] = None
    var reason: Option[ExecutorLossReason] = None
    synchronized {
        try {
            taskIdToTaskSetManager.get(tid) match {
                case Some(taskSet) =>
                // 如果FINISHED,呼叫taskResultGetter.enqueueSuccessfulTask()
                if (TaskState.isFinished(state)) {
                    cleanupTaskState(tid)
                    taskSet.removeRunningTask(tid)
                    if (state == TaskState.FINISHED) {
                        taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
                    } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
                        taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
                    }
                }
                case None =>
                    ....
            }
        } catch {
        }
    }
}

statusUpdate內部會根據任務的狀態不同做不同處理,這裡只說一下任務是FINISHED的情況。

如果狀態是TaskState.FINISHED,呼叫TaskResultGetter的enqueueSuccessfulTask方法處理。

enqueueSuccessfulTask內部根據結果型別進行處理。如果是IndirectTaskResult,通過blockid從遠端獲取結果;如果DirectTaskResult,那麼無需遠端獲取。


如果任務是ShuffleMapTask,需要將結果告知下游排程階段,以便作為後續排程階段的輸入。

這個是在DAGScheduler的handleTaskCompletion中實現的,將MapStatus註冊到MapOutputTrackerMaster中,從而完成ShuffleMapTask的處理

如果任務是ResultTask,如果完成,直接標記作業已經完成。


至此整個流程就走了一遍了。

在任務資源分配和結果處理說的有點不清晰,但對於瞭解整個任務執行流程沒有很大影響。

end.

以上是結合看書以及看原始碼寫的流程,如有偏差,歡迎交流指正。


Reference

《圖解Spark核心技術與案例實戰》



個人公眾號:碼農峰,定時推送行業資訊,持續釋出原創技術文章,歡迎大家關