1. 程式人生 > >Spark Shuffle機制詳細原始碼解析

Spark Shuffle機制詳細原始碼解析

Shuffle過程主要分為Shuffle write和Shuffle read兩個階段,2.0版本之後hash shuffle被刪除,只保留sort shuffle,下面結合程式碼分析: # 1.ShuffleManager Spark在初始化SparkEnv的時候,會在create()方法裡面初始化ShuffleManager ```scala // Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER) val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) ``` 這裡可以看到包含sort和tungsten-sort兩種shuffle,通過反射建立了ShuffleManager,ShuffleManager是一個特質,核心方法有下面幾個: ```scala private[spark] trait ShuffleManager { /** * 註冊一個shuffle返回控制代碼 */ def registerShuffle[K, V, C]( shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle /** 獲取一個Writer根據給定的分割槽,在executors執行map任務時被呼叫 */ def getWriter[K, V]( handle: ShuffleHandle, mapId: Long, context: TaskContext, metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] /** * 獲取一個Reader根據reduce分割槽的範圍,在executors執行reduce任務時被呼叫 */ def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] ... } ``` # 2.SortShuffleManager SortShuffleManager是ShuffleManager的唯一實現類,對於以上三個方法的實現如下: ## 2.1 registerShuffle ```scala /** * Obtains a [[ShuffleHandle]] to pass to tasks. */ override def registerShuffle[K, V, C]( shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { // 1.首先檢查是否符合BypassMergeSort if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) { // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't // need map-side aggregation, then write numPartitions files directly and just concatenate // them at the end. This avoids doing serialization and deserialization twice to merge // together the spilled files, which would happen with the normal code path. The downside is // having multiple files open at a time and thus more memory allocated to buffers. new BypassMergeSortShuffleHandle[K, V]( shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) // 2.否則檢查是否能夠序列化 } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient: new SerializedShuffleHandle[K, V]( shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else { // Otherwise, buffer map outputs in a deserialized form: new BaseShuffleHandle(shuffleId, dependency) } } ``` 1.首先檢查是否符合BypassMergeSort,這裡需要滿足兩個條件,首先是當前shuffle依賴中沒有map端的聚合操作,其次是分割槽數要小於spark.shuffle.sort.bypassMergeThreshold的值,預設為200,如果滿足這兩個條件,會返回BypassMergeSortShuffleHandle,啟用bypass merge-sort shuffle機制 ```scala def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = { // We cannot bypass sorting if we need to do map-side aggregation. if (dep.mapSideCombine) { false } else { // 預設值為200 val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD) dep.partitioner.numPartitions <= bypassMergeThreshold } } ``` 2.如果不滿足上面條件,檢查是否滿足canUseSerializedShuffle()方法,如果滿足該方法中的3個條件,則會返回SerializedShuffleHandle,啟用tungsten-sort shuffle機制 ```scala def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = { val shufId = dependency.shuffleId val numPartitions = dependency.partitioner.numPartitions // 序列化器需要支援Relocation if (!dependency.serializer.supportsRelocationOfSerializedObjects) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " + s"${dependency.serializer.getClass.getName}, does not support object relocation") false // 不能有map端聚合操作 } else if (dependency.mapSideCombine) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " + s"map-side aggregation") false // 分割槽數不能大於16777215+1 } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " + s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions") false } else { log.debug(s"Can use serialized shuffle for shuffle $shufId") true } } ``` 3.如果以上兩個條件都不滿足的話,會返回BaseShuffleHandle,採用基本sort shuffle機制 ## 2.2 getReader ```scala /** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). * Called on executors by reduce tasks. */ override def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( handle.shuffleId, startPartition, endPartition) new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) } ``` 這裡返回BlockStoreShuffleReader ## 2.3 getWriter ```scala /** Get a writer for a given partition. Called on executors by map tasks. */ override def getWriter[K, V]( handle: ShuffleHandle, mapId: Long, context: TaskContext, metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = { val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent( handle.shuffleId, _ => new OpenHashSet[Long](16)) mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) } val env = SparkEnv.get // 根據handle獲取不同ShuffleWrite handle match { case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => new UnsafeShuffleWriter( env.blockManager, context.taskMemoryManager(), unsafeShuffleHandle, mapId, context, env.conf, metrics, shuffleExecutorComponents) case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => new BypassMergeSortShuffleWriter( env.blockManager, bypassMergeSortHandle, mapId, env.conf, metrics, shuffleExecutorComponents) case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => new SortShuffleWriter( shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents) } } ``` 這裡會根據handle獲取不同ShuffleWrite,如果是SerializedShuffleHandle,使用UnsafeShuffleWriter,如果是BypassMergeSortShuffleHandle,採用BypassMergeSortShuffleWriter,否則使用SortShuffleWriter # 3.三種Writer的實現 如上文所說,當開啟bypass機制後,會使用BypassMergeSortShuffleWriter,如果serializer支援relocation並且map端沒有聚合同時分割槽數目不大於16777215+1三個條件都滿足,使用UnsafeShuffleWriter,否則使用SortShuffleWriter ## 3.1 BypassMergeSortShuffleWriter BypassMergeSortShuffleWriter繼承ShuffleWriter,用java實現,會將map端的多個輸出檔案合併為一個檔案,同時生成一個索引檔案,索引記錄到每個分割槽的初始地址,write()方法如下: ```scala @Override public void write(