[spark] Shuffle Read解析 (Sort Based Shuffle)

本文將講解shuffle Reduce部分,shuffle的下游Stage的第一個rdd是ShuffleRDD,通過其compute方法來獲取上游Stage Shuffle Write溢寫到磁碟檔案資料的一個迭代器:

 override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1
, context) .read() .asInstanceOf[Iterator[(K, C)]] }


override def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C] = {
    new BlockStoreShuffleReader(
      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)


 override def read(): Iterator[Product2[K, C]] = {
    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      // 獲取儲存資料位置的元資料
(handle.shuffleId, startPartition, endPartition), // 每次遠端請求傳輸的最大大小 SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024, SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue)) // 用壓縮加密來包裝流 val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) => serializerManager.wrapStream(blockId, inputStream) } val serializerInstance = dep.serializer.newInstance() // 對每個流生成K/V迭代器 val recordIter = wrappedStreams.flatMap { wrappedStream => serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator } // 每條記錄讀取後更新任務度量 val readMetrics = context.taskMetrics.createTempShuffleReadMetrics() // 生成完整的迭代器 val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( recordIter.map { record => readMetrics.incRecordsRead(1) record }, context.taskMetrics().mergeShuffleReadMetrics()) // An interruptible iterator must be used here in order to support task cancellation val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter) val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { // 在map端已經聚合一次了 val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]] dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context) } else { // 只在reduce端聚合 val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]] dep.aggregator.get.combineValuesByKey(keyValuesIterator, context) } } else { require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]] } // 若需要全域性排序 dep.keyOrdering match { case Some(keyOrd: Ordering[K]) => val sorter = new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer) sorter.insertAll(aggregatedIter) context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) case None => aggregatedIter } }


mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition)

該方法獲取reduce端資料的來源的元資料,返回的是 Seq[(BlockManagerId, Seq[(BlockId, Long)])],即資料是來自於哪個節點的哪些block的,並且block的資料大小是多少,看看getMapSizesByExecutorId是怎麼實現的:

def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
      : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
    logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
    // 獲取元資料資訊
    val statuses = getStatuses(shuffleId)
    // 轉換格式並得到指定partition的元資料資訊
    statuses.synchronized {
      return MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
  • 傳入shuffleId獲取對應shuffle的所有元資料資訊
  • 轉換格式並獲取指定partition的元資料


private def getStatuses(shuffleId: Int): Array[MapStatus] = {
    // 直接從mapStatuses中獲取
    val statuses = mapStatuses.get(shuffleId).orNull
    if (statuses == null) {
      logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
      val startTime = System.currentTimeMillis
      var fetchedStatuses: Array[MapStatus] = null
      if (fetchedStatuses == null) {
        // We won the race to fetch the statuses; do so
        logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
        // This try-finally prevents hangs due to timeouts:
        try {
          // 從遠端獲取元資料
          val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
          // 反序列化
          fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
          logInfo("Got the output locations")
          // 加入mapStatus
          mapStatuses.put(shuffleId, fetchedStatuses)
        } finally {
          fetching.synchronized {
            fetching -= shuffleId
    } else {
      return statuses



這裡的mapStatuses就是mapOutputTracker儲存元資料資訊的,mapOutputTracker和Executor一一對應,在該Executor上完成的Shuffle Write的元資料資訊都會儲存在其mapStatus裡面,另外通過遠端獲取的其他Executor上完成的Shuffle Write的元資料資訊也會在當前的mapStatuses中儲存。

Executor對應的是mapOutputTrackerWorker,而Driver對應的是mapOutputTrackerMaster,兩者都是在例項化SparkEnv的時候建立的,每個在Executor上完成的Shuffle Task的結果都會註冊到driver端的mapOutputTrackerMaster中,即driver端的mapOutputTrackerMaster的mapStatuses儲存這所有元資料資訊,所以當一個Executor上的任務需要獲取一個shuffle的輸出時,會先在自己的mapStatuses中查詢,找不到再和mapOutputTrackerMaster通訊獲取元資料。


case GetMapOutputStatuses(shuffleId: Int) =>
      val hostPort = context.senderAddress.hostPort
      logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
      val mapOutputStatuses = tracker.post(new GetMapOutputMessage(shuffleId, context))


 def post(message: GetMapOutputMessage): Unit = {


private val threadpool: ThreadPoolExecutor = {
    val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
    for (i <- 0 until numThreads) {
      pool.execute(new MessageLoop)


private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            // 取出一個GetMapOutputMessage
            val data = mapOutputRequests.take()
             if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
            val context = data.context
            val shuffleId = data.shuffleId
            val hostPort = context.senderAddress.hostPort
            logDebug("Handling request to send map output locations for shuffle " + shuffleId +
              " to " + hostPort)
            // 通過shuffleId獲取對應序列化後的元資料資訊
            val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId)
            // 返回資料
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
      } catch {
        case ie: InterruptedException => // exit


def getSerializedMapOutputStatuses(shuffleId: Int): Array[Byte] = {
    var statuses: Array[MapStatus] = null
    var retBytes: Array[Byte] = null
    var epochGotten: Long = -1

    // 從cache中檢索出MapStatus,若沒有則從mapStatuses中獲取
    def checkCachedStatuses(): Boolean = {
      epochLock.synchronized {
        if (epoch > cacheEpoch) {
          cacheEpoch = epoch
        cachedSerializedStatuses.get(shuffleId) match {
          case Some(bytes) =>
            retBytes = bytes
          case None =>
            logDebug("cached status not found for : " + shuffleId)
            statuses = mapStatuses.getOrElse(shuffleId, Array.empty[MapStatus])
            epochGotten = epoch

    if (checkCachedStatuses()) return retBytes
    var shuffleIdLock = shuffleIdLocks.get(shuffleId)
    if (null == shuffleIdLock) {
      val newLock = new Object()
      // in general, this condition should be false - but good to be paranoid
      val prevLock = shuffleIdLocks.putIfAbsent(shuffleId, newLock)
      shuffleIdLock = if (null != prevLock) prevLock else newLock
    // synchronize so we only serialize/broadcast it once since multiple threads call
    // in parallel
    shuffleIdLock.synchronized {
      if (checkCachedStatuses()) return retBytes

      // 序列化statues
      val (bytes, bcast) = MapOutputTracker.serializeMapStatuses(statuses, broadcastManager,
        isLocal, minSizeForBroadcast)
      logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
      // Add them into the table only if the epoch hasn't changed while we were working
      epochLock.synchronized {
        if (epoch == epochGotten) {
          cachedSerializedStatuses(shuffleId) = bytes
          if (null != bcast) cachedSerializedBroadcast(shuffleId) = bcast
        } else {
          logInfo("Epoch changed, not caching!")


再回到getMapSizesByExecutorId方法中,getStatuses得到shuffleID對應的所有的元資料資訊後,通過convertMapStatuses方法將獲得的元資料資訊轉化成形如Seq[(BlockManagerId, Seq[(BlockId, Long)])]格式的位置資訊,用來讀取指定的分割槽的資料:

private def convertMapStatuses(
      shuffleId: Int,
      startPartition: Int,
      endPartition: Int,
      statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
    assert (statuses != null)
    // 儲存指定partition的元資料
    val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]]
    for ((status, mapId) <- statuses.zipWithIndex) {
      if (status == null) {
        val errorMessage = s"Missing an output location for shuffle $shuffleId"
        throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage)
      } else {
        for (part <- startPartition until endPartition) {
          splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
            ((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part)))


這裡的引數statuses:Array[MapStatus]是前面獲取的上游stage所有的shuffle Write 檔案的元資料,並且是按map端的partitionId排序的,通過zipWithIndex將元素和這個元素在陣列中的ID(索引號)組合成鍵/值對,這裡的索引號即是map端的partitionId,再根據shuffleId、mapPartitionId、reducePartitionId來構建ShuffleBlockId(在map端的ShuffleBlockId構建中的reducePartitionId始終是0,因為一個ShuffleMapTask就一個Block,而這裡加入的真正的reducePartitionId在後面通過index檔案獲取對應reduce端partition偏移量的時候需要用到),並估算得到對應資料的大小,因為後面獲取遠端資料的時候需要限制大小,最後返回位置資訊。

至此mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition)方法完成,返回了指定分割槽對應的元資料MapStatus資訊。


private[this] def initialize(): Unit = {
    // Add a task completion callback (called in both success case and failure case) to cleanup.
    context.addTaskCompletionListener(_ => cleanup())

    // 區分local blocks和remote blocks並返回遠端請求FetchRequest
    val remoteRequests = splitLocalRemoteBlocks()
    // 將遠端請求隨機的加入到fetchRequests佇列中
    fetchRequests ++= Utils.randomize(remoteRequests)
    assert ((0 == reqsInFlight) == (0 == bytesInFlight),
      "expected reqsInFlight = 0 but found reqsInFlight = " + reqsInFlight +
      ", expected bytesInFlight = 0 but found bytesInFlight = " + bytesInFlight)

    // 從fetchRequests取出遠端請求,並使用sendRequest方法傳送請求

    val numFetches = remoteRequests.size - fetchRequests.size
    logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))

    // 獲取本地blocks
    logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
  • 區分local blocks和remote blocks,並返回遠端請求FetchRequest加入到fetchRequests佇列中
  • 從fetchRequests取出遠端請求,並使用sendRequest方法傳送請求,獲取遠端資料
  • 獲取本地blocks

先看是怎麼區分local blocks和remote blocks的:

private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
    // 將一次能獲取的資料最大大小/5,目的是增加並行度,最大為5個並行度
    val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
    logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)

    // 儲存遠端請求的陣列
    val remoteRequests = new ArrayBuffer[FetchRequest]

    // Tracks total number of blocks (including zero sized blocks)
    var totalBlocks = 0
    for ((address, blockInfos) <- blocksByAddress) {
      totalBlocks += blockInfos.size
      // 若block所在executor就是當前executor,則判斷為本地,否則為遠端
      if (address.executorId == blockManager.blockManagerId.executorId) {
        // 過濾掉大小為0的blocks
        localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
        numBlocksToFetch += localBlocks.size
      } else {
        val iterator = blockInfos.iterator
        var curRequestSize = 0L
        var curBlocks = new ArrayBuffer[(BlockId, Long)]
        while (iterator.hasNext) {
          val (blockId, size) = iterator.next()
          // Skip empty blocks
          if (size > 0) {
            curBlocks += ((blockId, size))
            remoteBlocks += blockId
            numBlocksToFetch += 1
            curRequestSize += size
          } else if (size < 0) {
            throw new BlockException(blockId, "Negative block size " + size)
          // 當請求大小超過了限制,則建立一個FetchRequest並加入到remoteRequests中
          if (curRequestSize >= targetRequestSize) {
            // Add this FetchRequest
            remoteRequests += new FetchRequest(address, curBlocks)
            curBlocks = new ArrayBuffer[(BlockId, Long)]
            logDebug(s"Creating fetch request of $curRequestSize at $address")
            curRequestSize = 0
        // 將剩餘的blocks建立一個FetchRequest並加入到remoteRequests中
        if (curBlocks.nonEmpty) {
          remoteRequests += new FetchRequest(address, curBlocks)
    logInfo(s"Getting $numBlocksToFetch non-empty blocks out of $totalBlocks blocks")
  • 為了增加在遠端節點獲取資料的並行度,將一個請求的大小限制除以5作為最終的大小限制,即每次最多啟動5個執行緒去最多5個節點上讀取資料
  • 判斷是否是本地blocks的條件是block所在的executor和當前executor是否是同一個
  • 遍歷遠端資料節點(Executor節點)的blocks,在一個節點上的請求資料超過大小限制則構建一個FetchRequest並加入到remoteRequests中,最後返回遠端請求remoteRequests,這裡的FetchRequest是對一個請求資料的包裝,包括地址和blockId及大小

區分完local remote blocks後加入到了佇列fetchRequests中,並呼叫fetchUpToMaxBytes()來獲取遠端資料:

private def fetchUpToMaxBytes(): Unit = {
    // Send fetch requests up to maxBytesInFlight
    while (fetchRequests.nonEmpty &&
      (bytesInFlight == 0 ||
        (reqsInFlight + 1 <= maxReqsInFlight &&
          bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))) {


 private[this] def sendRequest(req: FetchRequest) {
    logDebug("Sending request for %d blocks (%s) from %s".format(
      req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
    bytesInFlight += req.size
    reqsInFlight += 1

    // 轉成map  Map[blockId,size]
    val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap
    val remainingBlocks = new HashSet[String]() ++= sizeMap.keys
    val blockIds = req.blocks.map(_._1.toString)

    val address = req.address
    // 通過shuffleClient的fetchBlocks方法來獲取對應遠端節點上的資料
    shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
      new BlockFetchingListener {
        // 將結果儲存到results中
        override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = {
          // Only add the buffer to results queue if the iterator is not zombie,
          // i.e. cleanup() has not been called yet.
          ShuffleBlockFetcherIterator.this.synchronized {
            if (!isZombie) {
              // Increment the ref count because we need to pass this to a different thread.
              // This needs to be released after use.
              remainingBlocks -= blockId
              results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf,
              logDebug("remainingBlocks: " + remainingBlocks)
          logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))

        override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
          logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
          results.put(new FailureFetchResult(BlockId(blockId), address, e))

通過shuffleClient的fetchBlocks方法來獲取對應遠端節點上的資料,預設是通過NettyBlockTransferService的fetchBlocks方法實現的,不管是成功還是失敗都將構建SuccessFetchResult & FailureFetchResult 結果放入results中。


private[this] def fetchLocalBlocks() {
    val iter = localBlocks.iterator
    while (iter.hasNext) {
      val blockId = iter.next()
      try {
        val buf = blockManager.getBlockData(blockId)
        results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId, 0, buf, false))
      } catch {
        case e: Exception =>
          // If we see an exception, stop immediately.
          logError(s"Error occurred while fetching local blocks", e)
          results.put(new FailureFetchResult(blockId, blockManager.blockManagerId, e))


override def getBlockData(blockId: BlockId): ManagedBuffer = {
    if (blockId.isShuffle) {
    } else {
      getLocalBytes(blockId) match {
        case Some(buffer) => new BlockManagerManagedBuffer(blockInfoManager, blockId, buffer)
        case None =>
          // If this block manager receives a request for a block that it doesn't have then it's
          // likely that the master has outdated block statuses for this block. Therefore, we send
          // an RPC so that this block is marked as being unavailable from this block manager.
          reportBlockStatus(blockId, BlockStatus.empty)
          throw new BlockNotFoundException(blockId.toString)


override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
    // 根據ShuffleID和MapID獲取索引檔案
    val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
    val in = new DataInputStream(new FileInputStream(indexFile))
    try {
      // 跳到對應Block的資料區
      ByteStreams.skipFully(in, blockId.reduceId * 8)
      // partition對應的開始offset
      val offset = in.readLong()
      // partition對應的結束offset
      val nextOffset = in.readLong()
      new FileSegmentManagedBuffer(
        getDataFile(blockId.shuffleId, blockId.mapId),
        nextOffset - offset)
    } finally {



 override def read(): Iterator[Product2[K, C]] = {
    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      // 與mapOutputTrackerMaster通訊獲取儲存資料位置的元資料
      mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
      // 每次傳輸的最大大小
      SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
      SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue))

    // 用壓縮加密來包裝流
    val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) =>
      serializerManager.wrapStream(blockId, inputStream)

    val serializerInstance = dep.serializer.newInstance()

    // 對每個流生成K/V迭代器
    val recordIter = wrappedStreams.flatMap { wrappedStream =>

    // 每條記錄讀取後更新任務度量
    val readMetrics = context.taskMetrics.createTempShuffleReadMetrics()
    // 生成完整的迭代器
    val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
      recordIter.map { record =>

    // An interruptible iterator must be used here in order to support task cancellation
    val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)

    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        // 在map端已經聚合一次了
        val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
        dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
      } else {
        // 只在reduce端聚合
        val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
        dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
      interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]

    // 若需要全域性排序
    dep.keyOrdering match {
      case Some(keyOrd: Ordering[K]) =>
        val sorter =
          new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
        CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
      case None =>


case SuccessFetchResult(blockId, address, _, buf, _) =>
        try {
          (result.blockId, new BufferReleasingInputStream(buf.createInputStream(), this))
        } catch {
          case NonFatal(t) =>
            throwFetchFailedException(blockId, address, t)

在read()方法的後半部分會進行聚合和排序,和Shuffle Write部分很類似,這裡大致描述一下。


def combineCombinersByKey(
      iter: Iterator[_ <: Product2[K, C]],
      context: TaskContext): Iterator[(K, C)] = {
    val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
    updateMetrics(context, combiners)
def combineValuesByKey(
      iter: Iterator[_ <: Product2[K, V]],
      context: TaskContext): Iterator[(K, C)] = {
    val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
    updateMetrics(context, combiners)
def insertAll(entries: Iterator[Product2[K, V]]): Unit = {
    if (currentMap == null) {
      throw new IllegalStateException(
        "Cannot insert new elements into a map after calling iterator")
    // An update function for the map that we reuse across entries to avoid allocating
    // a new closure each time
    var curEntry: Product2[K, V] = null
    val update: (Boolean, C) => C = (hadVal, oldVal) => {
      if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2)

    while (entries.hasNext) {
      curEntry = entries.next()
      val estimatedSize = currentMap.estimateSize()
      if (estimatedSize > _peakMemoryUsedBytes) {
        _peakMemoryUsedBytes = estimatedSize
      if (maybeSpill(currentMap, estimatedSize)) {
        currentMap = new SizeTrackingAppendOnlyMap[K, C]
      currentMap.changeValue(curEntry._1, update)




override def iterator: Iterator[(K, C)] = {
    if (currentMap == null) {
      throw new IllegalStateException(
        "ExternalAppendOnlyMap.iterator is destructive and should only be called once.")
    if (spilledMaps.isEmpty) {
      CompletionIterator[(K, C), Iterator[(K, C)]](
        destructiveIterator(currentMap.iterator), freeCurrentMap())
    } else {
      new ExternalIterator()


// A queue that maintains a buffer for each stream we are currently merging
    // This queue maintains the invariant that it only contains non-empty buffers
    private val mergeHeap = new mutable.PriorityQueue[StreamBuffer]

    // Input streams are derived both from the in-memory map and spilled maps on disk
    // The in-memory map is sorted in place, while the spilled maps are already in sorted order
    private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator(
      currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap())
    private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)

    inputStreams.foreach { it =>
      val kcPairs = new ArrayBuffer[(K, C)]
      readNextHashCode(it, kcPairs)
      if (kcPairs.length > 0) {
        mergeHeap.enqueue(new StreamBuffer(it, kcPairs))

將currentMap中的資料經過排序後和spillFile資料的iterator組合在一起得到inputStreams ,迭代這個inputStreams ,將所有資料都儲存在mergeHeadp中,在ExternalIterator方法的next()方法中將被訪問到。

最後若需要對資料進行全域性的排序,則通過只有排序引數的ExternalSorter的insertAll方法來進行排序,和Shuffle Write一樣的這裡就不細講了。



