1. 程式人生 > >Spark 執行流程

Spark 執行流程

本文結合原始碼和《圖解Spark核心技術與案例實戰》簡單分析了Spark的job執行過程。
分析的案例程式碼如下,是一個簡單的word count 函式。

public static void main( String[] args )
    {
        String logFile = "wordcount.txt"; 
        SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]");
        JavaSparkContext sc =
new JavaSparkContext(conf); JavaRDD<String> logData = sc.textFile(logFile).cache(); JavaRDD<String> words = logData.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) throws Exception { return
Arrays.asList(s.split(" ")); } }); JavaPairRDD<String,Integer> wordPair = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return
new Tuple2<>(s,1); } }); JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }); // 觸發操作 List<Tuple2<String,Integer>> counts= wordCount.collect(); for (Tuple2<String, Integer> stringIntegerTuple2 : counts) { System.out.println(stringIntegerTuple2._1+":"+stringIntegerTuple2._2); } sc.stop(); } // wordcount.txt one two two three three three four four four four five five five five five six six six six six six

重要的類

首先講講幾個重要的類,方便接下來有目的性的去探索。介紹僅涉及到執行時分析的角度,不會詳細講解。

RDD

計算的真正開始時在RDD類的Action操作之後開始,這是典型的lazy模式。上述程式碼執行的開始也是在RDD的collect()函式里正式開始。而之前的flatMap,mapToPair()等操作都只是在構建DAG圖。

SparkContext

SparkContext 是Driver 的核心,rdd裡面的呼叫最終會轉移到SparkContext的呼叫,SparkContext 使用DAGSchedular和TaskSchedular來進行DAG的解析。

DAGScheduler

DAGScheduler負責對DAG進行解析,劃分排程階段。一個排程階段可以看成一組可以並行執行的任務。如本例的flatMap和後面的mapToPair()是一個排程階段,後面的reduceByKey是另一個排程階段(stage)。排程階段是按照寬依賴來劃分的。劃分為排程階段之後DAGScheduler將不同的排程階段封裝為不同的task傳送給DAGScheduler。

TaskScheduler

TaskScheduler的工作是排程Task,接收到DAGSchedular傳送過來的task之後TaskScheduler將這些Task分發給Executor執行。

ResultStage和ShuffleMapStage

ResultStage合ShuffleMapStage是解析DAG的結果,表示兩種排程階段。ResultStage表示最後一個RDD生成的排程階段,ShuffleMapStage則表示之前的一個shuffle的排程階段,因為有shuffle操作的前後兩個操作不能放到一個排程階段(因為是寬依賴,不能並行執行),所以這樣劃分。如果一個job沒有shuffle操作,那麼這個job就只有一個ResultStage,如果一個job有shuffle操作,那麼這個job就存在至少一個shuffleMapStage和一個ResultStage。

執行的過程

提交階段

提交DAG 到DAGScheduler

上例中呼叫了collect之後就觸發了任務的執行。任務執行的第一個階段是提交階段,collect內部呼叫情況如下

RDD.scala
  /**
   * Return an array that contains all of the elements in this RDD.
   * 首先是呼叫了SparkContext的runJob方法
   */
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

SparkContext.scala

	/**
	* 在SparkContext裡面幾段呼叫之後就轉移到了這個函式裡面,可以看出,這個函式主要是呼叫了dagSchedular的runJob
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark. The allowLocal
   * flag specifies whether the scheduler can run the computation on the driver rather than
   * shipping it out to the cluster, for short actions like first().
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit) {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
      resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

DAGScheduler.scala
一層簡單的封裝呼叫,插入了輸出日誌的語句,對實際執行沒有影響

def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    // 呼叫submitJob將job提交到事件佇列
    val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
    waiter.awaitResult() match {
      case JobSucceeded =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case JobFailed(exception: Exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        throw exception
    }
  }

DAGSchedular.scala
這裡是使用一個事件迴圈的幫助類,將submitJob這個任務放到了事件迴圈中

/**
   * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
   * can be used to block until the the job finishes executing or can be used to cancel the job.
   */
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

DAGScheduler.scala
在這裡接收到了事件,這是一個生產者,消費者佇列的模式,上面的eventProcessLoop使用了一個阻塞佇列,這裡根據從佇列中取出的不同任務採取不同的操作,可以從這個函式看到DAGScheduler在job的不同生命週期階段的行為。

/**
   * The main event loop of the DAG scheduler.
   */
  override def onReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
        listener, properties)

    case StageCancelled(stageId) =>
      dagScheduler.handleStageCancellation(stageId)

    case JobCancelled(jobId) =>
      dagScheduler.handleJobCancellation(jobId)

    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason)

    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }

DAGScheduler將DAG解析為stage

在handleJobSubmitted方法裡面就完成了從rdd到stage的轉變,主要是在封裝的newResultStage裡面進行的。

  /**
   * Create a ResultStage -- either directly for use as a result stage, or as part of the
   * (re)-creation of a shuffle map stage in newOrUsedShuffleStage.  The stage will be associated
   * with the provided jobId.
   */
  private def newResultStage(
      rdd: RDD[_],
      numTasks: Int,
      jobId: Int,
      callSite: CallSite): ResultStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite)

    stageIdToStage(id) = stage
    // 這裡解析了之前的stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

stage的解析是從後向前的解析,也就是從最後一個ResultStage向前倒推,尋找shuffle操作,然後根據shuffle操作生成ShuffleMapStage

  /**
   * Registers the given jobId among the jobs that need the given stage and
   * all of that stage's ancestors.
   */
  private def updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit = {
    def updateJobIdStageIdMapsList(stages: List[Stage]) {
      if (stages.nonEmpty) {
        val s = stages.head
        s.jobIds += jobId
        jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
        // 這個方法是解析stage的主要演算法的實現
        val parents: List[Stage] = getParentStages(s.rdd, jobId)
        // 將自身移除
        val parentsWithoutThisJobId = parents.filter { ! _.jobIds.contains(jobId) }
        updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
      }
    }
    updateJobIdStageIdMapsList(List(stage))
  }

這裡是個閉包呼叫,裡面的getParentStages()方法是解析stages的實現

  /**
   * Get or create the list of parent stages for a given RDD. The stages will be assigned the
   * provided jobId if they haven't already been created with a lower jobId.
   */
  private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
    val parents = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    // 廣度優先遍歷依賴,建立stages
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        // Kind of ugly: need to register RDDs with the cache here since
        // we can't do it in its constructor because # of partitions is unknown
        // 遍歷rdd的所有依賴
        for (dep <- r.dependencies) {
          dep match {
          // 如果是shuffle操作,那麼就在parents裡面加一個shuffleMapStage
            case shufDep: ShuffleDependency[_, _, _] =>
              parents += getShuffleMapStage(shufDep, jobId)
          // 不是就加入遍歷佇列,等待接下來的訪問
            case _ =>
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    parents.toList
  }

上面的程式碼使用圖的廣度優先遍歷演算法便利了DAG(也就是RDD的依賴圖),然後根據依賴的操作是否為shuffle操作建立stage,注意這裡是找出了所有的被ResultStage依賴的ShuffleMapStage,但是不一定就是直接依賴,間接依賴也放到parents裡面了,那麼每個parent的依賴是怎麼處理的呢?是遞迴呼叫了getParentStages()來進行處理的。

  /**
   * Get or create a shuffle map stage for the given shuffle dependency's map side.
   * The jobId value passed in will be used if the stage doesn't already exist with
   * a lower jobId (jobId always increases across jobs.)
   * getParentStages()裡面建立新的shuffleStage的時候呼叫了這個函式
   */
  private def getShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      jobId: Int): ShuffleMapStage = {
    shuffleToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) => stage
      case None =>
        // We are going to register ancestor shuffle dependencies
        registerShuffleDependencies(shuffleDep, jobId)
        // Then register current shuffleDep
        // 建立一個shuffleStage
        val stage = newOrUsedShuffleStage(shuffleDep, jobId)
        shuffleToMapStage(shuffleDep.shuffleId) = stage

        stage
    }
  }

  /**
   * Create a shuffle map Stage for the given RDD.  The stage will also be associated with the
   * provided jobId.  If a stage for the shuffleId existed previously so that the shuffleId is
   * present in the MapOutputTracker, then the number and location of available outputs are
   * recovered from the MapOutputTracker
   */
  private def newOrUsedShuffleStage(
      shuffleDep: ShuffleDependency[_, _, _],
      jobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    val numTasks = rdd.partitions.size
    // 建立新的shuffleMapStage
    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, jobId, rdd.creationSite)
    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      for (i <- 0 until locs.size) {
        stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
      }
      stage.numAvailableOutputs = locs.count(_ != null)
    } else {
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
    }
    stage
  }

  /**
  * 這個方法最後又回到了updateJobIdStageIdMaps()這個方法的呼叫,形成了遞迴
  */
  private def newShuffleMapStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: ShuffleDependency[_, _, _],
      jobId: Int,
      callSite: CallSite): ShuffleMapStage = {
    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
    val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
      jobId, callSite, shuffleDep)

    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

截止到這裡就完成了stage的解析,之後的工作就是將stage轉化成tasks然後傳送給TaskSchedular排程

  /** Submits stage, but first recursively submits any missing parents. */
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        // 這裡會判斷是否有依賴的stage,要是有依賴的,那麼要讓依賴的stage先執行
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          // 沒有依賴的stage,提交這個stage
          submitMissingTasks(stage, jobId