1. 程式人生 > >一個Spark job的生命歷程

一個Spark job的生命歷程

war result onf blog 超過 lsit fde mark ensure

一個job的生命歷程
dagScheduler.runJob //(1)
--> submitJob ( eventProcessLoop.post(JobSubmitted,***) //(2)
    --> eventProcessLoop //(3)
        --> onReceive(event: DAGSchedulerEvent) //(4)
            --> doOnReceive(event: DAGSchedulerEvent) //(5)
                --> case JobSubmitted //(6)
                    --> dagScheduler.handleJobSubmitted //
(7) --> finalStage =createResultStage(finalRDD, func, partitions, jobId, callSite) //(8) --> job = new ActiveJob(jobId, finalStage, callSite, listener, properties) //(9) --> jobIdToActiveJob(jobId) = job //(10) --> activeJobs += job //
(11) --> finalStage.setActiveJob(job) //(12) --> stageIds = jobIdToStageIds(jobId).toArray //(13) --> stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) //(14) --> listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //
(15) --> submitStage(finalStage) //(16) --> getMissingParentStages(stage).sortBy(_.id) //(17) --> finalStage = getOrCreateShuffleMapStage(dependency, jobId) //(18) --> createShuffleMapStage(dep, firstJobId) //(19) -->stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep) --> job = new ActiveJob(jobId, finalStage, callSite, listener, properties) //(20) --> submitStage(finalStage) //(21)//劃分和提交stage算法精髓 --> submitMissingTasks(stage, jobId.get) //(22) --> submitWaitingChildStages(stage) //(23) --> markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency)) //(24)
(1)所有的action算子都會觸發一個job的調度,經過多次不同的runjob重載後停在這裏調度 submitJob (2)調用eventProcessLoop方法,並發送 JobSubmitted 消息給DAGSchedulerEventProcessLoop(DAGScheduler的循環響應函數體) (3)eventProcessLoop = new DAGSchedulerEventProcessLoop(this) (4)onReceive 函數是接受 DAGSchedulerEventProcessLoop DAG調度程序的事件接受函數 (5)doOnReceive 實際是步驟4的事件處理函數 (6)根據步驟2的發送事件,觸發 JobSubmitted 這個事件響應 (7)dagScheduler 的核心入口 (8)使用觸發的job的最後一個RDD創建一個 finalstage,並且放入內存緩存中 stageIdToStage (9)使用 finalStage 創建一個job。這個job最後一個stage就是final stage (10)(11)(12)(13)(14)(15)把 job 加入各種內存緩存中,其實就是各個數據結構 (16)提交fianlStage。總是從最後開始往前推測。 (17)獲取當前stage的父stage。stage的劃分算法,主要在這裏。waitingForVisit = new Stack[RDD[_]]。棧結構,從最後的stage往前的stage 放進棧中,實現先進後出。符合程序調用順序。 (18)獲取最後一個stage,finalstage (19)生成一個 ShuffleMapStage (20)利用finalestage 生成一個job (21)劃分和提交stage算法精髓,劃分好stage之後全部放在waiting stage 數據結構中 (22)提交所有在 waiting stage 中的stage,從stage0...finalstage (23)檢查等待的階段,現在有資格重新提交。提交依賴於給定父級階段的階段。當父階段完成時調用成功 (24)所有的stage劃分完並提交結束 ------------------------------------------------------------------------------ stage劃分算法非常重要,精通spark,必須對stage劃分算法很清晰,知道自己編寫的spark程序被劃分為幾個job,每個job被劃分為幾個stage, 每個stage包含了哪些代碼,只有知道每個stage包括哪些代碼後。在線上,如果發現某個stage執行特別慢,或者某個stage一直報錯,才能針對 特定的stage包含的代碼排查問題,或性能調優。 stage劃分算法總結: 1.從finalstage倒推(通過 棧 數據結構實現) 2.通過寬依賴,進行stage的劃分 3.通過遞歸,優先提交父stage ------------------------------------------------------------------------------
/**
* 獲取某個stage的父stage
* 對於一個stage,如果它的最後一個RDD的所有依賴都是窄依賴,將不會創建新的stage
* 如果其RDD會依賴某個RDD,用寬依賴的RDD創建一個新的stage,並立即返回這個stage
* @type {[type]}
*/
private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new Stack[RDD[_]]
    
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
            //遍歷RDD的依賴,對於每種具有shuffle的操作,如reduceByKey,groupByKey,countByKey,底層對應了3個RDD:
            //Map
          for (dep <- rdd.dependencies) {
            dep match {
                //如果是寬依賴
              case shufDep: ShuffleDependency[_, _, _] =>
                  //使用寬依賴的RDD創建一個 ShuffleMapStage,並且將isShuffleMap 設置為true,
                  //默認最後一個stage不是shuffle不是ShuffleMapStage,但是finalstage之前所有的stage都是ShuffleMapStage
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              
                //如果是窄依賴
              case narrowDep: NarrowDependency[_] =>
              //將依賴的RDD放入棧中
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    //
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
    //
      visit(waitingForVisit.pop())
    }
    missing.toList
  }
------------------------------------------------------------------------------------------------------------------------------- taskScheduler -->taskSchedulerImpl (standalone模式) -->SparkDeploySchedulerBackend (負責創建AppClient, 向master註冊Application) 在TaskSchedulerImpl中,對一個單獨的taskset的任務進行調度.這個類負責追蹤每一個taskset,如果task失敗的話 會負責重試spark,直到超過重試次數,並且會通知延遲調度,為這個taskSet處理本地化機制.它的主要接口是 resourceOffer,在這個接口中,taskset會希望在一個節點上運行一個任務,並且接受任務的狀態改變消息, 來知道它負責的task的狀態改變了.
override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks //獲取ttaskSet的task列表
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      //每個taskSet都會創建一個manager,用於管理每個taskSet,並設定最大失敗次數 maxTaskFailures
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      //嘗試連接task,如果task失敗,會負責重試spark,直到超過重試次數,並且會通知延遲調度
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      //通過 manager 獲得活著的taskSet
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      //利用已選擇的調度器schedulableBuilder,把一個taskSet的manager加入調度管理池中
      /*
      def initialize(backend: SchedulerBackend) {
        this.backend = backend
        schedulableBuilder = {
          schedulingMode match {
            case SchedulingMode.FIFO =>
              new FIFOSchedulableBuilder(rootPool)
            case SchedulingMode.FAIR =>
              new FairSchedulableBuilder(rootPool, conf)
            case _ =>
              throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
              s"$schedulingMode")
          }
        }
        schedulableBuilder.buildPools()
      }*/
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
      
      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    /**
      * 創建 taskScheduler 的時候,就是為 taskSchedulerImpl 創建一個 SparkDeploySchedulerBackend .
      * 它負責創建AppClient,向master註冊Application
      */
    backend.reviveOffers()
  }

一個Spark job的生命歷程