Spark shuffle write過程
阿新 • • 發佈:2019-01-29
1. ShuffleMapTask的runTask()方法
override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) return writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } }
首先得到shuffleManager,shuffleManager分為三種SortShuffleManager,HashshuffleManager,UnsafeShuffleManager。這裡我們focus on SortShuffleManager。得到shuffleManager後,再拿到SortShuffleWriter。在呼叫SortShuffleWriter的write()方法將資料寫入shuffle檔案。
2. SortShuffleWriter的write()方法
首先建立ExternalSorter物件,將資料插入到物件中。最後落盤(對每個Reducer生成一個數據檔案和一個索引檔案)。override def write(records: Iterator[Product2[K, V]]): Unit = { if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") sorter = new ExternalSorter[K, V, C]( dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) sorter.insertAll(records) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer) sorter.insertAll(records) } // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) }
3. ExternalSorter的insertAll()方法
def insertAll(records: Iterator[_ <: Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined if (shouldCombine) { // Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValue val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { addElementsRead() kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update) maybeSpillCollection(usingMap = true) } } else if (bypassMergeSort) { // SPARK-4479: Also bypass buffering if merge sort is bypassed to avoid defensive copies if (records.hasNext) { spillToPartitionFiles( WritablePartitionedIterator.fromIterator(records.map { kv => ((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C]) }) ) } } else { // Stick values into our buffer while (records.hasNext) { addElementsRead() val kv = records.next() buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) maybeSpillCollection(usingMap = false) } } }
ExternalSorter裡的存放資料的結構是PartitionedAppendOnlyMap物件。每寫一條資料記錄,都會呼叫maybeSpillCollection()方法來檢查是否需要spill。
4. ExternalSorter的maybeSpillCollection()方法
private def maybeSpillCollection(usingMap: Boolean): Unit = {
if (!spillingEnabled) {
return
}
if (usingMap) {
if (maybeSpill(map, map.estimateSize())) {
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
if (maybeSpill(buffer, buffer.estimateSize())) {
buffer = if (useSerializedPairBuffer) {
new PartitionedSerializedPairBuffer[K, C](metaInitialRecords, kvChunkSize, serInstance)
} else {
new PartitionedPairBuffer[K, C]
}
}
}
}
estimateSize()是來估算PartitionedAppendOnlyMap物件佔用的記憶體空間,估算的頻率指數增長(為了控制估算函式的耗時)。