spark原始碼《四》task提交
接著spark原始碼《三》Stage劃分繼續扯,當stage所有的父stage都已經執行,則調submitMissingTasks(stage)方法,
submitMissingTasks()
val pendingTasks = new HashMap[Stage, HashSet[Task[_]]] val finalStage = newStage(finalRdd, None) val numOutputParts: Int = partitions.size//分割槽個數 val finished = new Array[Boolean](numOutputParts)//儲存已經執行的分割槽,一個分割槽對應一個task val outputParts = partitions.toArray//分割槽 def submitMissingTasks(stage: Stage) { //pendingTasks儲存的是stage未執行的task集合 val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet) var tasks = ArrayBuffer[Task[_]]()//儲存要提交的task if (stage == finalStage) {//判斷是否是finalStage for (id <- 0 until numOutputParts if (!finished(id))) {//遍歷找出未執行的分割槽下標 val part = outputParts(id)//通過下標找到分割槽 val locs = getPreferredLocs(finalRdd, part)//獲取優先位置 //建立一個ResultTask,加入到要提交的task陣列中 tasks += new ResultTask(runId, finalStage.id, finalRdd, func, part, locs, id) } } else { //如果不是finalStage for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { //遍歷分割槽,如果某個分割槽沒有輸出位置 val locs = getPreferredLocs(stage.rdd, p)//獲取優先位置 //建立一個ShuffleMapTask tasks += new ShuffleMapTask(runId, stage.id, stage.rdd, stage.shuffleDep.get, p, locs) } } myPending ++= tasks //將tasks加到未執行的tasks集合 submitTasks(tasks, runId)//提交tasks }
可以看到,如果stage是finalStage,則建立一個ResultTask,直接返回一個結果,如果不是finalStage,建立一個ShuffleMapTask,將計算完的資料寫入磁碟,等待後續task拉取,ResultTask與ShuffleMapTask區別,可看spark原始碼《二》Task。
接下來我們去看submitTasks()方法,
我看的是早期程式碼,用的是mesos排程,關於mesos排程的細節可以下載原始碼瞭解。
private val activeJobs = new HashMap[Int, Job]//儲存jobId,new SimpleJob() private var activeJobsQueue = new ArrayBuffer[Job] private val jobTasks = new HashMap[Int, HashSet[String]]//儲存jobId,task集合 def submitTasks(tasks: Seq[Task[_]], runId: Int) { logInfo("Got a job with " + tasks.size + " tasks") waitForRegister()//等待mesos註冊 this.synchronized { val jobId = newJobId()//生成jobId val myJob = new SimpleJob(this, tasks, runId, jobId)//建立SimpleJob例項 activeJobs(jobId) = myJob//加入Map中 activeJobsQueue += myJob//加入變長陣列 logInfo("Adding job with ID " + jobId) jobTasks(jobId) = HashSet.empty[String] } driver.reviveOffers();//請求資源執行myjob }
建立了SimpleJob例項後,就請求執行了。
下面寫個例子,對提交整個過程做個小結
val data=sc.parallelize(List("tom","jerry","zlq","wnn","hehe","eason"),2)
val mapd=data.map(x=>(x.length,1))
val redd=mapd.reduceByKey(_+_,3)//3為新RDD的分割槽數
val coud=redd.collect()
1.首先建立ParallelCollection(也繼承RDD),分割槽為2,無依賴
2.然後執行map運算元,生成MappedRDD,分割槽為2,依賴為 OneToOneDependency(ParallelCollection)
3.執行reduceByKey運算元,生成ShuffleRDD,分割槽為3,依賴為ShuffleDependency(MappedRDD),
4.執行count運算元,觸發sc.runJob()-->DAGScheduler.runJob(ShuffleRDD)
建立stage:val finalStage = newStage(ShuffleRDD, None)
接下來:submitStage(finalStage),先調getMissingParentStages(finalStage)劃分stage並獲取父stage,
呼叫submitStage()提交父stage,submitMissingTasks(父stage)建立一個ShuffleMapTask,生成三個本地檔案,等待ResultTask拉取。