1. 程式人生 > >[spark] Shuffle Write解析 (Sort Based Shuffle)

[spark] Shuffle Write解析 (Sort Based Shuffle)

本文基於 Spark 2.1 進行解析

前言

從 Spark 2.0 開始移除了Hash Based Shuffle,想要了解可參考Shuffle 過程,本文將講解 Sort Based Shuffle。

ShuffleMapTask的結果(ShuffleMapStage中FinalRDD的資料)都將寫入磁碟,以供後續Stage拉取,即整個Shuffle包括前Stage的Shuffle Write和後Stage的Shuffle Read,由於內容較多,本文先解析Shuffle Write。

概述:

  • 寫records到記憶體緩衝區(一個數組維護的map),每次insert&update都需要檢查是否達到溢寫條件。
  • 若需要溢寫,將集合中的資料根據partitionId和key(若需要)排序後順序溢寫到一個臨時的磁碟檔案,並釋放記憶體新建一個map放資料,每次溢寫都是寫一個新的臨時檔案。
  • 一個task最終對應一個檔案,將還在記憶體中的資料和已經spill的檔案根據reduce端的partitionId進行合併,合併後需要再次聚合排序(若需要),再根據partition的順序寫入最終檔案,並返回每個partition在檔案中的偏移量,最後以MapStatus物件返回給driver並註冊到MapOutputTrackerMaster中,後續reduce好通過它來訪問。

入口

執行一個ShuffleMapTask最終的執行邏輯是呼叫了ShuffleMapTask類的runTask()方法:

override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    // 從廣播變數中反序列化出finalRDD和dependency
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    var
writer: ShuffleWriter[Any, Any] = null try { // 獲取shuffleManager val manager = SparkEnv.get.shuffleManager // 通過shuffleManager的getWriter()方法,獲得shuffle的writer writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) // 通過rdd指定分割槽的迭代器iterator方法來遍歷每一條資料,再之上再呼叫writer的write方法以寫資料 writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } }

其中的finalRDD和dependency是在Driver端DAGScheluer中提交Stage的時候加入廣播變數的。

接著通過SparkEnv獲取shuffleManager,預設使用的是sort(對應的是org.apache.spark.shuffle.sort.SortShuffleManager),可通過spark.shuffle.manager設定。

然後呼叫了manager.getWriter方法,該方法中檢測到滿足Unsafe Shuffle條件會自動採用Unsafe Shuffle,否則採用Sort Shuffle。使用Unsafe Shuffle有幾個限制,shuffle階段不能有aggregate操作,分割槽數不能超過一定大小(224−1,這是可編碼的最大parition id),所以像reduceByKey這類有aggregate操作的運算元是不能使用Unsafe Shuffle。

這裡暫時討論Sort Shuffle的情況,即getWriter返回的是SortShuffleWriter,我們直接看writer.write發生了什麼:

override def write(records: Iterator[Product2[K, V]]): Unit = {
    sorter = if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      new ExternalSorter[K, V, C](
        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    } else {
      new ExternalSorter[K, V, V](
        context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
    }
    // 寫記憶體緩衝區,超過閾值則溢寫到磁碟檔案
    sorter.insertAll(records)
    // 獲取該task的最終輸出檔案
    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    val tmp = Utils.tempFileWith(output)
    try {
      val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
      // merge後寫到data檔案
      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
      // 寫index檔案
      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
      }
    }
  }
  • 通過判斷是否有map端的combine來建立不同的ExternalSorter,若有則將對應的aggregator和keyOrdering作為引數傳入。
  • 呼叫sorter.insertAll(records),將records寫入記憶體緩衝區,超過閾值則溢寫到磁碟檔案。
  • Merge記憶體記錄和所有被spill到磁碟的檔案,並寫到最終的資料檔案.data中。
  • 將每個partition的偏移量寫到index檔案中。

先細看sorter.inster是怎麼寫到記憶體,並spill到磁碟檔案的:

def insertAll(records: Iterator[Product2[K, V]]): Unit = {
    // TODO: stop combining if we find that the reduction factor isn't high
    val shouldCombine = aggregator.isDefined
    // 若需要Combine
    if (shouldCombine) {
      // 獲取對新value合併到聚合結果中的函式
      val mergeValue = aggregator.get.mergeValue
      // 獲取建立初始聚合值的函式
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      // 通過mergeValue 對已有的聚合結果的新value進行合併,通過createCombiner 對沒有聚合結果的新value初始化聚合結果
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      // 遍歷records
      while (records.hasNext) {
        addElementsRead()
        kv = records.next()
        // 使用update函式進行value的聚合
        map.changeValue((getPartition(kv._1), kv._1), update)
        // 是否需要spill到磁碟檔案
        maybeSpillCollection(usingMap = true)
      }
    // 不需要Combine
    } else {
      // Stick values into our buffer
      while (records.hasNext) {
        addElementsRead()
        val kv = records.next()
        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
        maybeSpillCollection(usingMap = false)
      }
    }
  }
  • 需要聚合的情況,遍歷records拿到record的KV,通過map的changeValue方法並根據update函式來對相同K的V進行聚合,這裡的map是PartitionedAppendOnlyMap型別,只能新增資料不能刪除資料,底層實現是一個數組,陣列中存KV鍵值對的方式是[K1,V1,K2,V2…],每一次操作後都會判斷是否要spill到磁碟。

  • 不需要聚合的情況,直接將record放入buffer,然後判斷是否要溢寫到磁碟。

先看map.changeValue方法到底是怎麼通過map實現對資料combine的:

override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    // 通過聚合演算法得到newValue
    val newValue = super.changeValue(key, updateFunc)
    // 跟新對map的大小取樣
    super.afterUpdate()
    newValue
  }

super.changeValue的實現:

def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    ...
    // 根據k 得到pos
    var pos = rehash(k.hashCode) & mask
    var i = 1
    while (true) {
      // 從data中獲取該位置的原來的key
      val curKey = data(2 * pos)  
      // 若原來的key和當前的key相等,則將兩個值進行聚合
      if (k.eq(curKey) || k.equals(curKey)) {
        val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        return newValue
       // 若當前key對應的位置沒有key,則將當前key作為該位置的key
       // 並通過update方法初始化該位置的聚合結果
      } else if (curKey.eq(null)) {
        val newValue = updateFunc(false, null.asInstanceOf[V])
        data(2 * pos) = k
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        // 擴容
        incrementSize()
        return newValue
      // 若對應位置有key但不和當前key相等,即hash衝突了,則繼續向後遍歷
      } else {
        val delta = i
        pos = (pos + delta) & mask
        i += 1
      }
    }
    null.asInstanceOf[V] // Never reached but needed to keep compiler happy
  }

根據K的hashCode再雜湊與上掩碼 得到 pos,2 * pos 為 k 應該所在的位置,2 * pos + 1 為 k 對應的 v 所在的位置,獲取k應該所在位置的原來的key:

  • 若原來的key和當前的 k 相等,則通過update函式將兩個v進行聚合並更新該位置的value
  • 若原來的key存在但不和當前的k 相等,則說明hash衝突了,更新pos繼續遍歷
  • 若原來的key不存在,則將當前k作為該位置的key,並通過update函式初始化該k對應的聚合結果,接著會通過incrementSize()方法進行擴容:

    private def incrementSize() {
      curSize += 1
      if (curSize > growThreshold) {
        growTable()
      }
    }

    跟新curSize,若當前大小超過了閾值growThreshold(growThreshold是當前容量capacity的0.7倍),則通過growTable()來擴容:

protected def growTable() {
    // 容量翻倍
    val newCapacity = capacity * 2
    require(newCapacity <= MAXIMUM_CAPACITY, s"Can't contain more than ${growThreshold} elements")
    //生成新的陣列來存資料
    val newData = new Array[AnyRef](2 * newCapacity)
    val newMask = newCapacity - 1
    var oldPos = 0
    while (oldPos < capacity) {
      // 將舊陣列中的資料重新計算位置放到新的陣列中
      if (!data(2 * oldPos).eq(null)) {
        val key = data(2 * oldPos)
        val value = data(2 * oldPos + 1)
        var newPos = rehash(key.hashCode) & newMask
        var i = 1
        var keepGoing = true
        while (keepGoing) {
          val curKey = newData(2 * newPos)
          if (curKey.eq(null)) {
            newData(2 * newPos) = key
            newData(2 * newPos + 1) = value
            keepGoing = false
          } else {
            val delta = i
            newPos = (newPos + delta) & newMask
            i += 1
          }
        }
      }
      oldPos += 1
    }
    // 替換及跟新變數
    data = newData
    capacity = newCapacity
    mask = newMask
    growThreshold = (LOAD_FACTOR * newCapacity).toInt
  }

這裡重新建立了一個兩倍capacity 的陣列來存放資料,將原來陣列中的資料通過重新計算位置放到新數組裡,將data替換為新的陣列,並跟新一些變數。

此時聚合已經完成,回到changeValue方面裡面,接下來會執行super.afterUpdate()方法來對map的大小進行取樣:

protected def afterUpdate(): Unit = {
    numUpdates += 1
    if (nextSampleNum == numUpdates) {
      takeSample()
    }
  }

若每遍歷跟新一條record,都來對map進行取樣估計大小,假設取樣一次需要1ms,100w次取樣就會花上16.7分鐘,效能大大降低。所以這裡只有當update次數達到nextSampleNum 的時候才通過takeSample()取樣一次:

private def takeSample(): Unit = {
    samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates))
    // Only use the last two samples to extrapolate
    if (samples.size > 2) {
      samples.dequeue()
    }
    // 估計每次跟新的變化量
    val bytesDelta = samples.toList.reverse match {
      case latest :: previous :: tail =>
        (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
      // If fewer than 2 samples, assume no change
      case _ => 0
    }
    // 跟新變化量
    bytesPerUpdate = math.max(0, bytesDelta)
    // 獲取下次取樣的次數
    nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
  }

這裡估計每次跟新的變化量的邏輯是:(當前map大小-上次取樣的時候的大小) / (當前update的次數 - 上次取樣的時候的update次數)。

接著計算下次需要取樣的update次數,該次數是指數級增長的,基數是1.1,第一次取樣後,要1.1次進行第二次取樣,第1.1*1.1次後進行第三次取樣,以此類推,開始增長慢,後面增長跨度會非常大。

這裡取樣完成後回到insetAll方法,接著通過maybeSpillCollection方法判斷是否需要spill:

 private def maybeSpillCollection(usingMap: Boolean): Unit = {
    var estimatedSize = 0L
    if (usingMap) {
      estimatedSize = map.estimateSize()
      if (maybeSpill(map, estimatedSize)) {
        map = new PartitionedAppendOnlyMap[K, C]
      }
    } else {
      estimatedSize = buffer.estimateSize()
      if (maybeSpill(buffer, estimatedSize)) {
        buffer = new PartitionedPairBuffer[K, C]
      }
    }

    if (estimatedSize > _peakMemoryUsedBytes) {
      _peakMemoryUsedBytes = estimatedSize
    }
  }

通過集合的estimateSize方法估計map的大小,若需要spill則將集合中的資料spill到磁碟檔案,並且為集合建立一個新的物件放資料。先看看估計大小的方法estimateSize:

 def estimateSize(): Long = {
    assert(samples.nonEmpty)
    val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
    (samples.last.size + extrapolatedDelta).toLong
  }

以上次取樣完更新的bytePerUpdate 作為最近平均每次跟新的大小,估計當前佔用記憶體:(當前update次數-上次取樣時的update次數) * 每次跟新大小 + 上次取樣記錄的大小。

獲取到當前集合的大小後呼叫maybeSpill判斷是否需要spill:

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    var shouldSpill = false
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      // Claim up to double our current memory from the shuffle memory pool
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      val granted = acquireMemory(amountToRequest)
      // 跟新申請到的記憶體
      myMemoryThreshold += granted 
      // 集合大小還是比申請到的記憶體大?spill : no spill
      shouldSpill = currentMemory >= myMemoryThreshold
    }
    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    // Actually spill
    if (shouldSpill) {
      _spillCount += 1
      logSpillage(currentMemory)
      spill(collection)
      _elementsRead = 0
      _memoryBytesSpilled += currentMemory
      releaseMemory()
    }
    shouldSpill
  }

這裡有兩種情況都可導致spill:

  • 當前集合包含的records數超過了 numElementsForceSpillThreshold(預設為Long.MaxValue,可通過spark.shuffle.spill.numElementsForceSpillThreshold設定)
  • 當前集合包含的records數為32的整數倍,並且當前集合的大小超過了申請的記憶體myMemoryThreshold(第一次申請預設為5 * 1024 * 1024,可通過spark.shuffle.spill.initialMemoryThreshold設定),此時並不會立即spill,會嘗試申請更多的記憶體避免spill,這裡嘗試申請的記憶體為2倍集合大小減去當前已經申請的記憶體大小(實際申請到的記憶體為granted),若加上原來的記憶體還是比當前集合的大小要小則需要spill。

若需要spill,則跟新spill次數,呼叫spill(collection)方法進行溢寫磁碟,並釋放記憶體。
跟進spill方法看看其具體實現:

override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    // 傳入comparator將集合中的資料先根據partition排序再通過key排序後返回一個迭代器
    val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
    // 寫到磁碟檔案,並返回一個對該檔案的描述物件SpilledFile
    val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
    // 新增到spill檔案陣列
    spills.append(spillFile)
  }

繼續跟進看看spillMemoryIteratorToDisk的實現:

private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
      : SpilledFile = {
    // 生成臨時檔案和blockId
    val (blockId, file) = diskBlockManager.createTempShuffleBlock()

    // 這些值在每次flush後會被重置
    var objectsWritten: Long = 0
    var spillMetrics: ShuffleWriteMetrics = null
    var writer: DiskBlockObjectWriter = null
    def openWriter(): Unit = {
      assert (writer == null && spillMetrics == null)
      spillMetrics = new ShuffleWriteMetrics
      writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)
    }
    openWriter()

    // 按寫入磁碟的順序記錄分支的大小
    val batchSizes = new ArrayBuffer[Long]

    // 記錄每個分割槽有多少元素
    val elementsPerPartition = new Array[Long](numPartitions)

    // Flush  writer 內容到磁碟,並更新相關變數
    def flush(): Unit = {
      val w = writer
      writer = null
      w.commitAndClose()
      _diskBytesSpilled += spillMetrics.bytesWritten
      batchSizes.append(spillMetrics.bytesWritten)
      spillMetrics = null
      objectsWritten = 0
    }

    var success = false
    try {
      // 遍歷迭代器
      while (inMemoryIterator.hasNext) {
        val partitionId = inMemoryIterator.nextPartition()
        require(partitionId >= 0 && partitionId < numPartitions,
          s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
        inMemoryIterator.writeNext(writer)
        elementsPerPartition(partitionId) += 1
        objectsWritten += 1
        // 元素個數達到批量序列化大小則flush到磁碟
        if (objectsWritten == serializerBatchSize) {
          flush()
          openWriter()
        }
      }
      // 將剩餘的資料flush
      if (objectsWritten > 0) {
        flush()
      } else if (writer != null) {
        val w = writer
        writer = null
        w.revertPartialWritesAndClose()
      }
      success = true
    } finally {
        ...
    }
    // 返回SpilledFile
    SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
  }

通過diskBlockManager建立臨時檔案和blockID,臨時檔名格式為是 “temp_shuffle_” + id,遍歷記憶體資料迭代器,並呼叫Writer(DiskBlockObjectWriter)的write方法,當寫的次數達到序列化大小則flush到磁碟檔案,並重新開啟writer,及跟新batchSizes等資訊。

最後返回一個SpilledFile物件,該物件包含了溢寫的臨時檔案File,blockId,每次flush的到磁碟的大小,每個partition對應的資料條數。

spill完成,並且insertAll方法也執行完成,回到開始的SortShuffleWriter的write方法:

override def write(records: Iterator[Product2[K, V]]): Unit = {
    ...
    // 寫記憶體緩衝區,超過閾值則溢寫到磁碟檔案
    sorter.insertAll(records)
    // 獲取該task的最終輸出檔案
    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    val tmp = Utils.tempFileWith(output)
    try {
      val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
      // merge後寫到data檔案
      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
      // 寫index檔案shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
      }
    }
  }

獲取最後的輸出檔名及blockId,檔案格式:

 "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"

接著通過sorter.writePartitionedFile方法來寫檔案,其中包括記憶體及所有spill檔案的merge操作,看看起具體實現:

def writePartitionedFile(
      blockId: BlockId,
      outputFile: File): Array[Long] = {

    val writeMetrics = context.taskMetrics().shuffleWriteMetrics

    // 跟蹤每個分割槽在檔案中的range
    val lengths = new Array[Long](numPartitions)
    // 資料只存在記憶體中
    if (spills.isEmpty) { 
      val collection = if (aggregator.isDefined) map else buffer
      // 將記憶體中的資料先通過partitionId再通過k排序後返回一個迭代器
      val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
      // 遍歷資料寫入磁碟
      while (it.hasNext) {
        val writer = blockManager.getDiskWriter(
          blockId, outputFile, serInstance, fileBufferSize, writeMetrics)
        val partitionId = it.nextPartition()
        //等待一個partition的資料寫完後重新整理到磁碟檔案
        while (it.hasNext && it.nextPartition() == partitionId) {
          it.writeNext(writer)
        }
        writer.commitAndClose()
        val segment = writer.fileSegment()
        // 記錄每個partition資料長度
        lengths(partitionId) = segment.length
      }
    } else {
      // 有資料spill到磁碟,先merge
      for ((id, elements) <- this.partitionedIterator) {
        if (elements.hasNext) {
          val writer = blockManager.getDiskWriter(
            blockId, outputFile, serInstance, fileBufferSize, writeMetrics)
          for (elem <- elements) {
            writer.write(elem._1, elem._2)
          }
          writer.commitAndClose()
          val segment = writer.fileSegment()
          lengths(id) = segment.length
        }
      }
    }

    context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
    context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
    context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)

    lengths
  }
  • 資料只存在記憶體中而沒有spill檔案,根據傳入的比較函式comparator來對集合裡的資料先根據partition排序再對裡面的key排序並返回一個迭代器,遍歷該迭代器得到所有recored,每一個partition對應一個writer,一個partition的資料寫完後再flush到磁碟檔案,並記錄該partition的資料長度。
  • 資料有spill檔案,通過方法partitionedIterator對記憶體和spill檔案的資料進行merge-sort後返回一個(partitionId,對應分割槽的資料的迭代器)的迭代器,也是一個partition對應一個Writer,寫完一個partition再flush到磁碟,並記錄該partition資料的長度。

接下來看看通過this.partitionedIterator方法是怎麼將記憶體及spill檔案的資料進行merge-sort的:

def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
    val usingMap = aggregator.isDefined
    val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
    if (spills.isEmpty) {
      if (!ordering.isDefined) {
        // 只根據partitionId排序,不需要對key排序
        groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
      } else {
        // 需要對partitionID和key進行排序
        groupByPartition(destructiveIterator(
          collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
      }
    } else {
      // Merge spilled and in-memory data
      merge(spills, destructiveIterator(
        collection.partitionedDestructiveSortedIterator(comparator)))
    }
  }

這裡在有spill檔案的情況下會執行下面的merge方法,傳入的是spill檔案陣列和記憶體中的資料進過partitionId和key排序後的資料迭代器,看看merge:

private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
      : Iterator[(Int, Iterator[Product2[K, C]])] = {
    // 每個檔案對應一個Reader
    val readers = spills.map(new SpillReader(_)) 
    val inMemBuffered = inMemory.buffered
    (0 until numPartitions).iterator.map { p =>
      // 獲取記憶體中當前partition對應的Iterator
      val inMemIterator = new IteratorForPartition(p, inMemBuffered)
      // 將spill檔案對應的partition的資料與記憶體中對應partition資料合併
      val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
      if (aggregator.isDefined) {
        // 對key進行聚合並排序
        (p, mergeWithAggregation(
          iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
      } else if (ordering.isDefined) {
        // 排序
        (p, mergeSort(iterators, ordering.get))
      } else {
        (p, iterators.iterator.flatten)
      }
    }
  }

merge方法將屬於同一個reduce端的partition的記憶體資料和spill檔案資料合併起來,再進行聚合排序(有需要的話),最後返回(reduce對應的partitionId,該分割槽資料迭代器)

將資料merge-sort後寫入最終的檔案後,需要將每個partition的偏移量持久化到檔案以供後續每個reduce根據偏移量獲取自己的資料,寫偏移量的邏輯很簡單,就是根據前面得到的partition長度的陣列將偏移量寫到index檔案中,對應的檔名為:

def writeIndexFileAndCommit(
      shuffleId: Int,
      mapId: Int,
      lengths: Array[Long],
      dataTmp: File): Unit = {
    val indexFile = getIndexFile(shuffleId, mapId)
    val indexTmp = Utils.tempFileWith(indexFile)
    try {
      val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
      Utils.tryWithSafeFinally {
        // We take in lengths of each block, need to convert it to offsets.
        var offset = 0L
        out.writeLong(offset)
        for (length <- lengths) {
          offset += length
          out.writeLong(offset)
        }
      } 
    ......
    }
  }

根據shuffleId和mapId獲取index檔案並建立一個寫檔案的檔案流,按照reduce端partition對應的offset依次寫到index檔案中,如:
0,
length(partition1),
length(partition1)+length(partition2),
length(partition1)+length(partition2)+length(partition3)

最後建立一個MapStatus例項返回,包含了reduce端每個partition對應的偏移量。

該物件將返回到Driver端的DAGScheluer處理,被新增到對應stage的OutputLoc裡,當該stage的所有task完成的時候會將這些結果註冊到MapOutputTrackerMaster,以便下一個stage的task就可以通過它來獲取shuffle的結果的元資料資訊。

至此Shuffle Write完成!