1. 程式人生 > >Spark原始碼分析之Sort-Based Shuffle讀寫流程

Spark原始碼分析之Sort-Based Shuffle讀寫流程

override def read(): Iterator[Product2[K, C]] = {   // 構造ShuffleBlockFetcherIterator,一個迭代器,它獲取多個塊,對於本地塊,從本地讀取   // 對於遠端塊,通過遠端方法讀取val blockFetcherItr = new ShuffleBlockFetcherIterator(     context,     blockManager.shuffleClient,     blockManager,     //MapOutputTrackerSparkEnv啟動的時候例項化mapOutputTracker.getMapSizesByExecutorId
(handle.shuffleId, startPartition, endPartition),     // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility     SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,     SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue))   //
基於配置檔案對於流進行包裝
val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) =>     serializerManager.wrapStream(blockId, inputStream)   }   // 獲取序列化例項val serializerInstance = dep.serializer.newInstance()   // 對於每一個流建立一個<key,value>迭代器val recordIter = wrappedStreams.flatMap { wrappedStream
=>     serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator   }   // Update the context task metrics for each record read.   val readMetrics = context.taskMetrics.createTempShuffleReadMetrics()   val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](     recordIter.map { record =>       readMetrics.incRecordsRead(1)       record     },     context.taskMetrics().mergeShuffleReadMetrics())   // An interruptible iterator must be used here in order to support task cancellation   val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)   // 如果reduce端需要聚合val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {     // 如果map端已經聚合過了if (dep.mapSideCombine) {       //則對讀取到的聚合結果進行聚合val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]       // 針對map端各個partitionkey進行聚合後的結果再次聚合dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)     } else {       // 如果map端沒有聚合,則針對未合併的<k,v>進行聚合val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]       dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)     }   } else {     require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")     interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]   }   // 如果需要對key排序,則進行排序。基於sortshuffle實現過程中,預設只是按照partitionId排序   // 在每一個partition內部並沒有排序,因此添加了keyOrdering變數,提供是否需要對分割槽內部的key排序dep.keyOrdering match {     case Some(keyOrd: Ordering[K]) =>       // 為了減少記憶體壓力和避免GC開銷,引入了外部排序器,當記憶體不足時會根據配置檔案       // spark.shuffle.spill決定是否進行spill操作val sorter =         new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)       sorter.insertAll(aggregatedIter)       context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)       context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)       context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)       CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())     case None =>       // 不需要排序直接返回aggregatedIter   } }