一個Spark job的生命歷程
阿新 • • 發佈:2018-01-21
war result onf blog 超過 lsit fde mark ensure 一個job的生命歷程
dagScheduler.runJob //(1) --> submitJob ( eventProcessLoop.post(JobSubmitted,***) //(2) --> eventProcessLoop //(3) --> onReceive(event: DAGSchedulerEvent) //(4) --> doOnReceive(event: DAGSchedulerEvent) //(5) --> case JobSubmitted //(6) --> dagScheduler.handleJobSubmitted //(1)所有的action算子都會觸發一個job的調度,經過多次不同的runjob重載後停在這裏調度 submitJob (2)調用eventProcessLoop方法,並發送 JobSubmitted 消息給DAGSchedulerEventProcessLoop(DAGScheduler的循環響應函數體) (3)eventProcessLoop = new DAGSchedulerEventProcessLoop(this) (4)onReceive 函數是接受 DAGSchedulerEventProcessLoop DAG調度程序的事件接受函數 (5)doOnReceive 實際是步驟4的事件處理函數 (6)根據步驟2的發送事件,觸發 JobSubmitted 這個事件響應 (7)dagScheduler 的核心入口 (8)使用觸發的job的最後一個RDD創建一個 finalstage,並且放入內存緩存中 stageIdToStage (9)使用 finalStage 創建一個job。這個job最後一個stage就是final stage (10)(11)(12)(13)(14)(15)把 job 加入各種內存緩存中,其實就是各個數據結構 (16)提交fianlStage。總是從最後開始往前推測。 (17)獲取當前stage的父stage。stage的劃分算法,主要在這裏。waitingForVisit = new Stack[RDD[_]]。棧結構,從最後的stage往前的stage 放進棧中,實現先進後出。符合程序調用順序。 (18)獲取最後一個stage,finalstage (19)生成一個 ShuffleMapStage (20)利用finalestage 生成一個job (21)劃分和提交stage算法精髓,劃分好stage之後全部放在waiting stage 數據結構中 (22)提交所有在 waiting stage 中的stage,從stage0...finalstage (23)檢查等待的階段,現在有資格重新提交。提交依賴於給定父級階段的階段。當父階段完成時調用成功 (24)所有的stage劃分完並提交結束 ------------------------------------------------------------------------------ stage劃分算法非常重要,精通spark,必須對stage劃分算法很清晰,知道自己編寫的spark程序被劃分為幾個job,每個job被劃分為幾個stage, 每個stage包含了哪些代碼,只有知道每個stage包括哪些代碼後。在線上,如果發現某個stage執行特別慢,或者某個stage一直報錯,才能針對 特定的stage包含的代碼排查問題,或性能調優。 stage劃分算法總結: 1.從finalstage倒推(通過 棧 數據結構實現) 2.通過寬依賴,進行stage的劃分 3.通過遞歸,優先提交父stage ------------------------------------------------------------------------------(7) --> finalStage =createResultStage(finalRDD, func, partitions, jobId, callSite) //(8) --> job = new ActiveJob(jobId, finalStage, callSite, listener, properties) //(9) --> jobIdToActiveJob(jobId) = job //(10) --> activeJobs += job //(11) --> finalStage.setActiveJob(job) //(12) --> stageIds = jobIdToStageIds(jobId).toArray //(13) --> stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) //(14) --> listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //(15) --> submitStage(finalStage) //(16) --> getMissingParentStages(stage).sortBy(_.id) //(17) --> finalStage = getOrCreateShuffleMapStage(dependency, jobId) //(18) --> createShuffleMapStage(dep, firstJobId) //(19) -->stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep) --> job = new ActiveJob(jobId, finalStage, callSite, listener, properties) //(20) --> submitStage(finalStage) //(21)//劃分和提交stage算法精髓 --> submitMissingTasks(stage, jobId.get) //(22) --> submitWaitingChildStages(stage) //(23) --> markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency)) //(24)
/** * 獲取某個stage的父stage * 對於一個stage,如果它的最後一個RDD的所有依賴都是窄依賴,將不會創建新的stage * 如果其RDD會依賴某個RDD,用寬依賴的RDD創建一個新的stage,並立即返回這個stage * @type {[type]} */ private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] val waitingForVisit = new Stack[RDD[_]] def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if (rddHasUncachedPartitions) { //遍歷RDD的依賴,對於每種具有shuffle的操作,如reduceByKey,groupByKey,countByKey,底層對應了3個RDD: //Map for (dep <- rdd.dependencies) { dep match { //如果是寬依賴 case shufDep: ShuffleDependency[_, _, _] => //使用寬依賴的RDD創建一個 ShuffleMapStage,並且將isShuffleMap 設置為true, //默認最後一個stage不是shuffle不是ShuffleMapStage,但是finalstage之前所有的stage都是ShuffleMapStage val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { missing += mapStage } //如果是窄依賴 case narrowDep: NarrowDependency[_] => //將依賴的RDD放入棧中 waitingForVisit.push(narrowDep.rdd) } } } } } // waitingForVisit.push(stage.rdd) while (waitingForVisit.nonEmpty) { // visit(waitingForVisit.pop()) } missing.toList }------------------------------------------------------------------------------------------------------------------------------- taskScheduler -->taskSchedulerImpl (standalone模式) -->SparkDeploySchedulerBackend (負責創建AppClient, 向master註冊Application) 在TaskSchedulerImpl中,對一個單獨的taskset的任務進行調度.這個類負責追蹤每一個taskset,如果task失敗的話 會負責重試spark,直到超過重試次數,並且會通知延遲調度,為這個taskSet處理本地化機制.它的主要接口是 resourceOffer,在這個接口中,taskset會希望在一個節點上運行一個任務,並且接受任務的狀態改變消息, 來知道它負責的task的狀態改變了.
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks //獲取ttaskSet的task列表 logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { //每個taskSet都會創建一個manager,用於管理每個taskSet,並設定最大失敗次數 maxTaskFailures val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId //嘗試連接task,如果task失敗,會負責重試spark,直到超過重試次數,並且會通知延遲調度 val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) //通過 manager 獲得活著的taskSet stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } //利用已選擇的調度器schedulableBuilder,把一個taskSet的manager加入調度管理池中 /* def initialize(backend: SchedulerBackend) { this.backend = backend schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) case _ => throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " + s"$schedulingMode") } } schedulableBuilder.buildPools() }*/ schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } /** * 創建 taskScheduler 的時候,就是為 taskSchedulerImpl 創建一個 SparkDeploySchedulerBackend . * 它負責創建AppClient,向master註冊Application */ backend.reviveOffers() }
一個Spark job的生命歷程