1. 程式人生 > >Spark 2.x 提交原始碼淺析

Spark 2.x 提交原始碼淺析

大家都知道,spark job的提交是觸發了Action操作,現在我在RDD.scala中找到collect運算元,在這下面是有一個runjob方法

 def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

然後繼續進入runjob方法,發現是對封裝(rdd,func,partitions)等引數的runjob的一連串呼叫,最後發現是dagScheduler呼叫了runjob方法

dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

繼續點進runjob,接下來進入到SparkContext最核心的DAGScheduler,這裡可以看到呼叫了一個submitJob方法,他是DAGScheduler的submitJob,它會提交一個Job任務,然後返回一個阻塞的執行緒等待Job完成

def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {
      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }
  }

然後進入到submitJob方法, eventProcessLoop是DAGSchedulerEventProcessLoop的一個例項

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // 檢查分割槽是否存在保證Task正常執行
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }
      //增加一個JobId作當前Job的標識
    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // 如果沒有Task任務,將立即返回JobWaiter
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }
    //為分割槽做個判斷,確保分割槽大於0
    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    //首先構造一個JobWaiter阻塞執行緒,等待Job完成,然後把結果提交給resultHandler
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    //eventProcessLoop是DAGScheduler的事件佇列
    //因為可能叢集執行著多個Job,而DAGScheduler預設是FIFO先進先出的資源排程
    //這裡傳入的事件型別是JobSubmitted,而在eventProcessLoop會呼叫doOnReceive
    //來匹配事件型別並執行對應的操作,最終會匹配到dagScheduler、handleJobSubmitted
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }
  

進入到eventProcessLoop 裡面new DAGSchedulerEventProcessLoop(this)專門用來接收Job和Stage的發來的訊息

 private val messageScheduler =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
         //專門用來接收Job和Stage的發來的訊息
  private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
  taskScheduler.setDAGScheduler(this)

DAGSchedulerEventProcessLoop是個類,這個類裡面呼叫了很關鍵的onRceive方法和doOnReceive方法 ,它會對裡面的事件不斷地迴圈呼叫,要處理Stage的劃分、shuffleMapTask和ResultTask的劃分,還有一些job任務的提交和型別

 override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
      //在這裡注意一下,它會進行很關鍵的操作
      doOnReceive(event)
    } finally {
      timerContext.stop()
    }
  }
//然後通過模式匹配進行匹配哪個事件模型
  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      //注意這裡是Stage劃分的精髓所在
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

    case StageCancelled(stageId, reason) =>
      dagScheduler.handleStageCancellation(stageId, reason)

    case JobCancelled(jobId, reason) =>
      dagScheduler.handleJobCancellation(jobId, reason)

    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId, reason) =>
      val filesLost = reason match {
        case SlaveLost(_, true) => true
        case _ => false
      }
      dagScheduler.handleExecutorLost(execId, filesLost)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion: CompletionEvent =>
      dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  

進入handleJobSubmitted方法,先建立resultStage,它是真正開始處理Job劃分Stage的事件,劃分ShufflemapStage後,ShufflemapStage裡面會劃分ShufflemapTask,每個job由一個ResultStage和0+個ShufflemapShuffle組成,finalRDD和分割槽、jobId都會被傳進finalStage

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    //建立ResultStage 這裡才是真正開始處理提交的job劃分stage的時候
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      //每個job都是由1個ResultStage和0+個ShuffleMapStage組成
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)
  }

進入createResultStage方法,建立ResultStage的父Stage,將父stage的引數放入到new ResultStage()中,生成ResultStage

private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    //開始建立ResultStage的父stage
    //裡面有多個巢狀獲取shuffle依賴和迴圈建立shuffleMapStage,若沒有shuffle
    //操作則返回空list
    val parents = getOrCreateParentStages(rdd, jobId)
    //當前的stageId標識+1
    val id = nextStageId.getAndIncrement()
    //放入剛剛生成的父stage等核心引數,生成ResultStage
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    //把ResultStage和它的ID加入stageIdToList
    stageIdToStage(id) = stage
    //更新jobIds和jobIdToStageIds
    updateJobIdStageIdMaps(jobId, stage)
    //返回這個ResultStage
    stage
  }

進入到建立父Stage的方法getOrCreateParentStages

private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
   //從getshuffleDependencies開始
    //這裡僅僅是抽取當前RDD的shuffle依賴
    //(job的stage是以shuffle劃分的,1個job中只會生成1個resultStage和0+個
    // shuffleMapStage,如果不是shuffleDependency就繼續抽取父RDD。。)
    //迭代遍歷一直到抽取出為止或者沒有
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }

進入getOrCreateShuffleMapStage方法中,進行匹配能不能取到ParentStage的值,當沒有parentStage的時候會返回空,能取到就返回stage,ShuffleMapStage是根據遍歷出的ShuffleDependencies一次次創建出來的

private def getOrCreateShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    //通過從ShuffleDependency提取到的shuffleId來提取shuffleIdToMapStage中的
    //shuffleMapStage
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
        //如果能取到就直接返回
      case Some(stage) =>
        stage
//如果取不到就會依次找到所有父ShuffleDependencies並且構建
        //所有父ShuffleMapStage
      case None =>
        // Create stages for all missing ancestor shuffle dependencies.
        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
          // that were not already in shuffleIdToMapStage, it's possible that by the time we
          // get to a particular dependency in the foreach loop, it's been added to
          // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
          // SPARK-13902 for more information.
          //根據遍歷出來的所有shuffleDependency依次建立所有父ShuffleMapStage
          //接下來進行判斷是否是父stage
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
            createShuffleMapStage(dep, firstJobId)
          }
        }
        //最後會建立當前ShuffleDependency的ShuffleMapStage
        // Finally, create a stage for the given shuffle dependency.
        createShuffleMapStage(shuffleDep, firstJobId)
    }
  }

進入createShuffleMapStage方法 此方法是遞迴迴圈建立shuffleMapStage的過程

 def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
    //ShuffleDependency的父RDD
    val rdd = shuffleDep.rdd
    //多少分割槽
    val numTasks = rdd.partitions.length
    //用父RDD迴圈呼叫,每次呼叫都是前一個父RDD
    //在這裡其實就會一直遞迴迴圈直到拿到首個stage才退出來
    //最後把生成的ShuffleMapStage加入shuffleIdToMapStage以便後面直接從中拿取
    val parents = getOrCreateParentStages(rdd, jobId)
    //標記當前StageId nextStageId+1
    val id = nextStageId.getAndIncrement()
    //拿到之前的stage等核心引數後就可以構建ShuffleMapStage了
    val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
   //把剛建立的ShuffleMapStage賦值給stageIdToStage
    stageIdToStage(id) = stage
    //賦值給shuffleIdToMapStage
    //若後面的程式碼再次生成對應的ShuffleMapStage就可以從shuffleIdToMapStage
    //中直接拿取了
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    //更新jobIds的jobIdToStageIds
    updateJobIdStageIdMaps(jobId, stage)
    //這裡會把shuffle資訊註冊到Driver上的MapOutputTrackerMaster的
    //shufflestatuses
    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      // A previously run stage generated partitions for this shuffle, so for each output
      // that's still available, copy information about that output location to the new stage
      // (so we don't unnecessarily re-compute that data).
      //把shuffle資訊註冊到自己Driver的MapOutputTrackerMaster
      //生成的是shuffleId和shuffleStatus的對映關係
      //在後面提交Job的時候還會根據它來的map stage是否已經準備好
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      (0 until locs.length).foreach { i =>
        if (locs(i) ne null) {
          // locs(i) will be null if missing
          stage.addOutputLoc(i, locs(i))
        }
      }
    } else {
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      //這裡建立好ShuffleMapStage後
      //可以看到把Shuffle資訊註冊到自己Driver的MapOutputTrackerMaster
      //的shuffleStatuses中,用來在後面的驗證和reduce端拉取map輸出

      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    //最後返回生成的ShuffleMapStage
    stage
  

這個時候ShuffleMapStage已經建立完成了,並不是一次就建立完成,而是遇見shuffle的時候會由下往上遞迴建立ShuffleMapStage

今天先分享到這裡,有理解錯誤的地方歡迎批評指正 !