Spark Shuffle機制詳細原始碼解析
阿新 • • 發佈:2020-11-13
Shuffle過程主要分為Shuffle write和Shuffle read兩個階段,2.0版本之後hash shuffle被刪除,只保留sort shuffle,下面結合程式碼分析:
# 1.ShuffleManager
Spark在初始化SparkEnv的時候,會在create()方法裡面初始化ShuffleManager
```scala
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
```
這裡可以看到包含sort和tungsten-sort兩種shuffle,通過反射建立了ShuffleManager,ShuffleManager是一個特質,核心方法有下面幾個:
```scala
private[spark] trait ShuffleManager {
/**
* 註冊一個shuffle返回控制代碼
*/
def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
/** 獲取一個Writer根據給定的分割槽,在executors執行map任務時被呼叫 */
def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]
/**
* 獲取一個Reader根據reduce分割槽的範圍,在executors執行reduce任務時被呼叫
*/
def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
...
}
```
# 2.SortShuffleManager
SortShuffleManager是ShuffleManager的唯一實現類,對於以上三個方法的實現如下:
## 2.1 registerShuffle
```scala
/**
* Obtains a [[ShuffleHandle]] to pass to tasks.
*/
override def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
// 1.首先檢查是否符合BypassMergeSort
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
// 2.否則檢查是否能夠序列化
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, dependency)
}
}
```
1.首先檢查是否符合BypassMergeSort,這裡需要滿足兩個條件,首先是當前shuffle依賴中沒有map端的聚合操作,其次是分割槽數要小於spark.shuffle.sort.bypassMergeThreshold的值,預設為200,如果滿足這兩個條件,會返回BypassMergeSortShuffleHandle,啟用bypass merge-sort shuffle機制
```scala
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
false
} else {
// 預設值為200
val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
```
2.如果不滿足上面條件,檢查是否滿足canUseSerializedShuffle()方法,如果滿足該方法中的3個條件,則會返回SerializedShuffleHandle,啟用tungsten-sort shuffle機制
```scala
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
// 序列化器需要支援Relocation
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
s"${dependency.serializer.getClass.getName}, does not support object relocation")
false
// 不能有map端聚合操作
} else if (dependency.mapSideCombine) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +
s"map-side aggregation")
false
// 分割槽數不能大於16777215+1
} else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
false
} else {
log.debug(s"Can use serialized shuffle for shuffle $shufId")
true
}
}
```
3.如果以上兩個條件都不滿足的話,會返回BaseShuffleHandle,採用基本sort shuffle機制
## 2.2 getReader
```scala
/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startPartition, endPartition)
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
}
```
這裡返回BlockStoreShuffleReader
## 2.3 getWriter
```scala
/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
handle.shuffleId, _ => new OpenHashSet[Long](16))
mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }
val env = SparkEnv.get
// 根據handle獲取不同ShuffleWrite
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf,
metrics,
shuffleExecutorComponents)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
bypassMergeSortHandle,
mapId,
env.conf,
metrics,
shuffleExecutorComponents)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(
shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents)
}
}
```
這裡會根據handle獲取不同ShuffleWrite,如果是SerializedShuffleHandle,使用UnsafeShuffleWriter,如果是BypassMergeSortShuffleHandle,採用BypassMergeSortShuffleWriter,否則使用SortShuffleWriter
# 3.三種Writer的實現
如上文所說,當開啟bypass機制後,會使用BypassMergeSortShuffleWriter,如果serializer支援relocation並且map端沒有聚合同時分割槽數目不大於16777215+1三個條件都滿足,使用UnsafeShuffleWriter,否則使用SortShuffleWriter
## 3.1 BypassMergeSortShuffleWriter
BypassMergeSortShuffleWriter繼承ShuffleWriter,用java實現,會將map端的多個輸出檔案合併為一個檔案,同時生成一個索引檔案,索引記錄到每個分割槽的初始地址,write()方法如下:
```scala
@Override
public void write(