1. 程式人生 > >spark原始碼《二》Task

spark原始碼《二》Task

上篇主要寫了spark的基本資料抽象RDD,spark原始碼《一》RDD,這篇主要是spark的最小執行單元Task,

當發生shuffle依賴時,會切分stage,每個stage的task數量,由該stage最後rdd的partition數量決定,

task有兩種,shufflemaptask和resulttask,resulttask是finalstage,也就是需要將結果返回給driver的stage,而

shufflemaptask無需將結果返回,需要將結果shuffle後傳給後面的shufflemaptask或者resulttask,類似與

mapreduce的mapper,shuffle完後將資料傳給reducer。

1.Task

class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) 
extends Serializable


abstract class Task[T] extends Serializable {
  def run(id: Int): T//執行該task
  def preferredLocations: Seq[String] = Nil//獲取優先位置
  def generation: Option[Long] = None//當fetch資料失敗時,該值+1
}

TaskContext有三個類引數,​​​​分別為:
stageId,表示該task屬於哪個stage

splitId,RDD的partition

attemptId,執行Id

2.DAGTask

為Task的子類

abstract class DAGTask[T](val runId: Int, val stageId: Int) extends Task[T] {
  //getGeneration通過請求worker或master來獲取當前的generation數
val gen = SparkEnv.get.mapOutputTracker.getGeneration

  override def generation: Option[Long] = Some(gen)
}

3.ResultTask

為DAGTask的子類

class ResultTask[T, U](
    runId: Int,
    stageId: Int, 
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    val partition: Int, 
    locs: Seq[String],
    val outputId: Int)
  extends DAGTask[U](runId, stageId) {
  
  val split = rdd.splits(partition)//獲取分割槽


  override def run(attemptId: Int): U = {

    val context = new TaskContext(stageId, partition, attemptId)
//例項化一個TaskContext物件

    func(context, rdd.iterator(split))
//返回一個方法,引數為TaskContext物件,rdd的某個分割槽資料
  }

  override def preferredLocations: Seq[String] = locs

  override def toString = "ResultTask(" + stageId + ", " + partition + ")"
}

可以看到ResultTask的run()方法返回的是一個用於計算某個rdd分割槽方法,方法可以是count(),take()等,直接計算出結果

4.ShuffleMapTask

class ShuffleMapTask(
    runId: Int,
    stageId: Int,
    rdd: RDD[_], 
    dep: ShuffleDependency[_,_,_],
    val partition: Int, 
    locs: Seq[String])
  extends DAGTask[String](runId, stageId)
  with Logging {
  
  val split = rdd.splits(partition)  

  override def run (attemptId: Int): String = {
    val numOutputSplits = dep.partitioner.numPartitions //獲取分割槽數
    
    //將引數強轉
    val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]]
    val partitioner = dep.partitioner.asInstanceOf[Partitioner]
    
    //建立一個長度為分割槽數的陣列
    val buckets = Array.tabulate(numOutputSplits)(_ => new JHashMap[Any, Any])

    for (elem <- rdd.iterator(split)) {//遍歷rdd分割槽的元素
   
      val (k, v) = elem.asInstanceOf[(Any, Any)]
      var bucketId = partitioner.getPartition(k)//決定該元素去往後一個rdd的哪個分割槽
      val bucket = buckets(bucketId)//取陣列下標為bucketId的資料
      var existing = bucket.get(k)//通過key獲取value

      if (existing == null) {//如果為空
        bucket.put(k, aggregator.createCombiner(v))//新建累加器,將k,v放入
      } else {
        bucket.put(k, aggregator.mergeValue(existing, v))//否則,直接將v放入累加器
      }
    }
    val ser = SparkEnv.get.serializer.newInstance()
    for (i <- 0 until numOutputSplits) {
     //建立檔案,準備寫資料
      val file = SparkEnv.get.shuffleManager.getOutputFile(dep.shuffleId, partition, i)
       
    //建立寫入檔案資料的流
      val out = ser.outputStream(new FastBufferedOutputStream(new FileOutputStream(file)))
    
      out.writeObject(buckets(i).size)//先寫入每個陣列(分割槽)的元素數

      val iter = buckets(i).entrySet().iterator()
      while (iter.hasNext()) {//遍歷,將陣列資料寫往對應的分割槽檔案
        val entry = iter.next()
        out.writeObject((entry.getKey, entry.getValue))
      }
      
      out.close()
    }
    return SparkEnv.get.shuffleManager.getServerUri
    //返回uri,等待後續task拉取資料
  }

  override def preferredLocations: Seq[String] = locs

  override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partition)
}

可以看到ShuffleMapTask返回的是一個uri,等待後續的task拉取資料,

該方法主要分為兩步,

第一步遍歷rdd分割槽中資料,根據partitioner(可檢視上一篇瞭解分割槽器的兩種型別)決定分割槽中資料去往後續rdd的哪個分割槽,

並將去往同一分割槽的資料寫入下標相同的陣列

第二步遍歷陣列,按分割槽寫入對應的檔案(當後續rdd有n個分割槽時,會寫n個檔案),返回uri,等待後續節點拉取資料

 

如圖所示,假設父RDD有4個分割槽,子RDD由3個分割槽,當父RDD第一個分割槽,呼叫run()方法時,

會先建立一個長度為3的陣列,遍歷分割槽元素,通過partitioner決定元素去往下標為0或1或2的位置,然後寫入陣列,

接下來建立3個檔案,將資料資料寫入對應的檔案,返回uri,等待fetch