spark原始碼《二》Task
上篇主要寫了spark的基本資料抽象RDD,spark原始碼《一》RDD,這篇主要是spark的最小執行單元Task,
當發生shuffle依賴時,會切分stage,每個stage的task數量,由該stage最後rdd的partition數量決定,
task有兩種,shufflemaptask和resulttask,resulttask是finalstage,也就是需要將結果返回給driver的stage,而
shufflemaptask無需將結果返回,需要將結果shuffle後傳給後面的shufflemaptask或者resulttask,類似與
mapreduce的mapper,shuffle完後將資料傳給reducer。
1.Task
class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int)
extends Serializable
abstract class Task[T] extends Serializable {
def run(id: Int): T//執行該task
def preferredLocations: Seq[String] = Nil//獲取優先位置
def generation: Option[Long] = None//當fetch資料失敗時,該值+1
}
TaskContext有三個類引數,分別為:
stageId,表示該task屬於哪個stage
splitId,RDD的partition
attemptId,執行Id
2.DAGTask
為Task的子類
abstract class DAGTask[T](val runId: Int, val stageId: Int) extends Task[T] { //getGeneration通過請求worker或master來獲取當前的generation數 val gen = SparkEnv.get.mapOutputTracker.getGeneration override def generation: Option[Long] = Some(gen) }
3.ResultTask
為DAGTask的子類
class ResultTask[T, U](
runId: Int,
stageId: Int,
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
val partition: Int,
locs: Seq[String],
val outputId: Int)
extends DAGTask[U](runId, stageId) {
val split = rdd.splits(partition)//獲取分割槽
override def run(attemptId: Int): U = {
val context = new TaskContext(stageId, partition, attemptId)
//例項化一個TaskContext物件
func(context, rdd.iterator(split))
//返回一個方法,引數為TaskContext物件,rdd的某個分割槽資料
}
override def preferredLocations: Seq[String] = locs
override def toString = "ResultTask(" + stageId + ", " + partition + ")"
}
可以看到ResultTask的run()方法返回的是一個用於計算某個rdd分割槽方法,方法可以是count(),take()等,直接計算出結果
4.ShuffleMapTask
class ShuffleMapTask(
runId: Int,
stageId: Int,
rdd: RDD[_],
dep: ShuffleDependency[_,_,_],
val partition: Int,
locs: Seq[String])
extends DAGTask[String](runId, stageId)
with Logging {
val split = rdd.splits(partition)
override def run (attemptId: Int): String = {
val numOutputSplits = dep.partitioner.numPartitions //獲取分割槽數
//將引數強轉
val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]]
val partitioner = dep.partitioner.asInstanceOf[Partitioner]
//建立一個長度為分割槽數的陣列
val buckets = Array.tabulate(numOutputSplits)(_ => new JHashMap[Any, Any])
for (elem <- rdd.iterator(split)) {//遍歷rdd分割槽的元素
val (k, v) = elem.asInstanceOf[(Any, Any)]
var bucketId = partitioner.getPartition(k)//決定該元素去往後一個rdd的哪個分割槽
val bucket = buckets(bucketId)//取陣列下標為bucketId的資料
var existing = bucket.get(k)//通過key獲取value
if (existing == null) {//如果為空
bucket.put(k, aggregator.createCombiner(v))//新建累加器,將k,v放入
} else {
bucket.put(k, aggregator.mergeValue(existing, v))//否則,直接將v放入累加器
}
}
val ser = SparkEnv.get.serializer.newInstance()
for (i <- 0 until numOutputSplits) {
//建立檔案,準備寫資料
val file = SparkEnv.get.shuffleManager.getOutputFile(dep.shuffleId, partition, i)
//建立寫入檔案資料的流
val out = ser.outputStream(new FastBufferedOutputStream(new FileOutputStream(file)))
out.writeObject(buckets(i).size)//先寫入每個陣列(分割槽)的元素數
val iter = buckets(i).entrySet().iterator()
while (iter.hasNext()) {//遍歷,將陣列資料寫往對應的分割槽檔案
val entry = iter.next()
out.writeObject((entry.getKey, entry.getValue))
}
out.close()
}
return SparkEnv.get.shuffleManager.getServerUri
//返回uri,等待後續task拉取資料
}
override def preferredLocations: Seq[String] = locs
override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partition)
}
可以看到ShuffleMapTask返回的是一個uri,等待後續的task拉取資料,
該方法主要分為兩步,
第一步遍歷rdd分割槽中資料,根據partitioner(可檢視上一篇瞭解分割槽器的兩種型別)決定分割槽中資料去往後續rdd的哪個分割槽,
並將去往同一分割槽的資料寫入下標相同的陣列
第二步遍歷陣列,按分割槽寫入對應的檔案(當後續rdd有n個分割槽時,會寫n個檔案),返回uri,等待後續節點拉取資料
如圖所示,假設父RDD有4個分割槽,子RDD由3個分割槽,當父RDD第一個分割槽,呼叫run()方法時,
會先建立一個長度為3的陣列,遍歷分割槽元素,通過partitioner決定元素去往下標為0或1或2的位置,然後寫入陣列,
接下來建立3個檔案,將資料資料寫入對應的檔案,返回uri,等待fetch