Spark原始碼分析之Sort-Based Shuffle讀寫流程
阿新 • • 發佈:2019-01-22
override def read(): Iterator[Product2[K, C]] = {
// 構造ShuffleBlockFetcherIterator,一個迭代器,它獲取多個塊,對於本地塊,從本地讀取
// 對於遠端塊,通過遠端方法讀取val blockFetcherItr = new ShuffleBlockFetcherIterator(
context,
blockManager.shuffleClient,
blockManager,
//MapOutputTracker在SparkEnv啟動的時候例項化mapOutputTracker.getMapSizesByExecutorId (handle.shuffleId, startPartition, endPartition),
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue))
// 基於配置檔案對於流進行包裝val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) =>
serializerManager.wrapStream(blockId, inputStream)
}
// 獲取序列化例項val serializerInstance = dep.serializer.newInstance()
// 對於每一個流建立一個<key,value>迭代器val recordIter = wrappedStreams.flatMap { wrappedStream =>
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
}
// Update the context task metrics for each record read.
val readMetrics = context.taskMetrics.createTempShuffleReadMetrics()
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map { record =>
readMetrics.incRecordsRead(1)
record
},
context.taskMetrics().mergeShuffleReadMetrics())
// An interruptible iterator must be used here in order to support task cancellation
val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
// 如果reduce端需要聚合val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
// 如果map端已經聚合過了if (dep.mapSideCombine) {
//則對讀取到的聚合結果進行聚合val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
// 針對map端各個partition對key進行聚合後的結果再次聚合dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
} else {
// 如果map端沒有聚合,則針對未合併的<k,v>進行聚合val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
}
// 如果需要對key排序,則進行排序。基於sort的shuffle實現過程中,預設只是按照partitionId排序
// 在每一個partition內部並沒有排序,因此添加了keyOrdering變數,提供是否需要對分割槽內部的key排序dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// 為了減少記憶體壓力和避免GC開銷,引入了外部排序器,當記憶體不足時會根據配置檔案
// spark.shuffle.spill決定是否進行spill操作val sorter =
new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
case None =>
// 不需要排序直接返回aggregatedIter
}
}