1. 程式人生 > >spark原始碼《四》task提交

spark原始碼《四》task提交

接著spark原始碼《三》Stage劃分繼續扯,當stage所有的父stage都已經執行,則調submitMissingTasks(stage)方法,

submitMissingTasks()

val pendingTasks = new HashMap[Stage, HashSet[Task[_]]]
val finalStage = newStage(finalRdd, None) 
val numOutputParts: Int = partitions.size//分割槽個數
val finished = new Array[Boolean](numOutputParts)//儲存已經執行的分割槽,一個分割槽對應一個task
val outputParts = partitions.toArray//分割槽

def submitMissingTasks(stage: Stage) {
        //pendingTasks儲存的是stage未執行的task集合
        val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
        var tasks = ArrayBuffer[Task[_]]()//儲存要提交的task
        if (stage == finalStage) {//判斷是否是finalStage

          for (id <- 0 until numOutputParts if (!finished(id))) {//遍歷找出未執行的分割槽下標
            val part = outputParts(id)//通過下標找到分割槽
            val locs = getPreferredLocs(finalRdd, part)//獲取優先位置
            
            //建立一個ResultTask,加入到要提交的task陣列中
            tasks += new ResultTask(runId, finalStage.id, finalRdd, func, part, locs, id)
          }
        } else {
          //如果不是finalStage
          for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
            //遍歷分割槽,如果某個分割槽沒有輸出位置
            val locs = getPreferredLocs(stage.rdd, p)//獲取優先位置
            //建立一個ShuffleMapTask
            tasks += new ShuffleMapTask(runId, stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
          }
        }
        myPending ++= tasks //將tasks加到未執行的tasks集合
        submitTasks(tasks, runId)//提交tasks
      }

可以看到,如果stage是finalStage,則建立一個ResultTask,直接返回一個結果,如果不是finalStage,建立一個ShuffleMapTask,將計算完的資料寫入磁碟,等待後續task拉取,ResultTask與ShuffleMapTask區別,可看spark原始碼《二》Task

接下來我們去看submitTasks()方法,

我看的是早期程式碼,用的是mesos排程,關於mesos排程的細節可以下載原始碼瞭解。

private val activeJobs = new HashMap[Int, Job]//儲存jobId,new SimpleJob()
private var activeJobsQueue = new ArrayBuffer[Job]
private val jobTasks = new HashMap[Int, HashSet[String]]//儲存jobId,task集合

def submitTasks(tasks: Seq[Task[_]], runId: Int) {
    logInfo("Got a job with " + tasks.size + " tasks")
    waitForRegister()//等待mesos註冊
    this.synchronized {
      val jobId = newJobId()//生成jobId
      val myJob = new SimpleJob(this, tasks, runId, jobId)//建立SimpleJob例項
      activeJobs(jobId) = myJob//加入Map中
      activeJobsQueue += myJob//加入變長陣列
      logInfo("Adding job with ID " + jobId)
      jobTasks(jobId) = HashSet.empty[String]
    }
    driver.reviveOffers();//請求資源執行myjob
  }

建立了SimpleJob例項後,就請求執行了。

 

下面寫個例子,對提交整個過程做個小結

    val data=sc.parallelize(List("tom","jerry","zlq","wnn","hehe","eason"),2)
    val mapd=data.map(x=>(x.length,1))
    val redd=mapd.reduceByKey(_+_,3)//3為新RDD的分割槽數
    val coud=redd.collect()

1.首先建立ParallelCollection(也繼承RDD),分割槽為2,無依賴

2.然後執行map運算元,生成MappedRDD,分割槽為2,依賴為 OneToOneDependency(ParallelCollection)

3.執行reduceByKey運算元,生成ShuffleRDD,分割槽為3,依賴為ShuffleDependency(MappedRDD),

4.執行count運算元,觸發sc.runJob()-->DAGScheduler.runJob(ShuffleRDD)

建立stage:val finalStage = newStage(ShuffleRDD, None)

接下來:submitStage(finalStage),先調getMissingParentStages(finalStage)劃分stage並獲取父stage,

呼叫submitStage()提交父stage,submitMissingTasks(父stage)建立一個ShuffleMapTask,生成三個本地檔案,等待ResultTask拉取。