1. 程式人生 > >Spark shuffle write過程

Spark shuffle write過程

1. ShuffleMapTask的runTask()方法

override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    metrics = Some(context.taskMetrics)
    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      return writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

首先得到shuffleManager,shuffleManager分為三種SortShuffleManager,HashshuffleManager,UnsafeShuffleManager。這裡我們focus on SortShuffleManager。得到shuffleManager後,再拿到SortShuffleWriter。在呼叫SortShuffleWriter的write()方法將資料寫入shuffle檔案。

2. SortShuffleWriter的write()方法

override def write(records: Iterator[Product2[K, V]]): Unit = {
    if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      sorter = new ExternalSorter[K, V, C](
        dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
      sorter.insertAll(records)
    } else {
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
      sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)
      sorter.insertAll(records)
    }

    // Don't bother including the time to open the merged output file in the shuffle write time,
    // because it just opens a single file, so is typically too fast to measure accurately
    // (see SPARK-3570).
    val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
    val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
    shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)

    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
  }
首先建立ExternalSorter物件,將資料插入到物件中。最後落盤(對每個Reducer生成一個數據檔案和一個索引檔案)。

3. ExternalSorter的insertAll()方法

def insertAll(records: Iterator[_ <: Product2[K, V]]): Unit = {
    // TODO: stop combining if we find that the reduction factor isn't high
    val shouldCombine = aggregator.isDefined

    if (shouldCombine) {
      // Combine values in-memory first using our AppendOnlyMap
      val mergeValue = aggregator.get.mergeValue
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (records.hasNext) {
        addElementsRead()
        kv = records.next()
        map.changeValue((getPartition(kv._1), kv._1), update)
        maybeSpillCollection(usingMap = true)
      }
    } else if (bypassMergeSort) {
      // SPARK-4479: Also bypass buffering if merge sort is bypassed to avoid defensive copies
      if (records.hasNext) {
        spillToPartitionFiles(
          WritablePartitionedIterator.fromIterator(records.map { kv =>
            ((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
          })
        )
      }
    } else {
      // Stick values into our buffer
      while (records.hasNext) {
        addElementsRead()
        val kv = records.next()
        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
        maybeSpillCollection(usingMap = false)
      }
    }
  }

ExternalSorter裡的存放資料的結構是PartitionedAppendOnlyMap物件。每寫一條資料記錄,都會呼叫maybeSpillCollection()方法來檢查是否需要spill。

4. ExternalSorter的maybeSpillCollection()方法

  private def maybeSpillCollection(usingMap: Boolean): Unit = {
    if (!spillingEnabled) {
      return
    }

    if (usingMap) {
      if (maybeSpill(map, map.estimateSize())) {
        map = new PartitionedAppendOnlyMap[K, C]
      }
    } else {
      if (maybeSpill(buffer, buffer.estimateSize())) {
        buffer = if (useSerializedPairBuffer) {
          new PartitionedSerializedPairBuffer[K, C](metaInitialRecords, kvChunkSize, serInstance)
        } else {
          new PartitionedPairBuffer[K, C]
        }
      }
    }
  }
estimateSize()是來估算PartitionedAppendOnlyMap物件佔用的記憶體空間,估算的頻率指數增長(為了控制估算函式的耗時)。