spark core原始碼分析15 Shuffle詳解-寫流程
Shuffle是一個比較複雜的過程,有必要詳細剖析一下內部寫的邏輯
ShuffleManager分為SortShuffleManager和HashShuffleManager
一、SortShuffleManager
每個ShuffleMapTask不會為每個Reducer生成一個單獨的檔案;相反,它會將所有的結果寫到一個本地檔案裡,同時會生成一個index檔案,Reducer可以通過這個index檔案取得它需要處理的資料。避免產生大量的檔案的直接收益就是節省了記憶體的使用和順序Disk
IO帶來的低延時。
它在寫入分割槽資料的時候,首先會根據實際情況對資料採用不同的方式進行排序操作,底線是至少按照
/** Get a writer for a given partition. Called on executors by map tasks. */ override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) : ShuffleWriter[K, V] = { val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]] shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, baseShuffleHandle.numMaps) new SortShuffleWriter( shuffleBlockResolver, baseShuffleHandle, mapId, context) }
shuffleMapNumber是一個HashMap<shuffleId,numMaps>
SortShuffleWriter提供write介面用於真實資料的寫磁碟,而在write介面中會使用shuffleBlockResolver與底層檔案打交道
下面看獲得SortShuffleWriter之後,呼叫write進行寫
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
write的引數其實就是呼叫了rdd的compute方法進行計算,返回的這個partition的迭代器/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)
sorter.insertAll(records)
}
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}
可以看到,設定了mapSideCombine的需要將aggregator和keyOrdering傳入到ExternalSorter中,否則將上面兩項引數設為None。接著呼叫insertAll方法
def insertAll(records: Iterator[_ <: Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined
if (shouldCombine) {
// Combine values in-memory first using our AppendOnlyMap
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
addElementsRead()
kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update)
maybeSpillCollection(usingMap = true)
}
} else if (bypassMergeSort) {
// SPARK-4479: Also bypass buffering if merge sort is bypassed to avoid defensive copies
if (records.hasNext) {
spillToPartitionFiles(
WritablePartitionedIterator.fromIterator(records.map { kv =>
((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
})
)
}
} else {
// Stick values into our buffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
}
}
}
解釋一下內部邏輯:
(1) 如果是shouldCombine,將k-v資訊記錄到一個Array中,預設是大小是64*2,儲存格式為key0,value0,key1,value1,key2,value2...。map.changeValue方法就是將value的值不斷的呼叫mergeValue方法,去更新array中指定位置的value值。如果k-v的量達到array size的0.7時,會自動擴容。
之後呼叫maybeSpillCollection,首先判斷是否需要spill,依據是開啟spillingEnabled標誌(不開啟有OOM風險,其實上面rehash擴容的時候就應該是有OOM風險了),且讀取的元素是32的整數倍,且目前佔用的記憶體大於設定的閥值(5M),就去向shuffleMemoryManager申請記憶體(shuffleMemoryManager中有一個閥值,每個shuffle task向他申請時都會記錄一下),申請的容量是當前使用容量的2倍減去閥值(5M),如果申請成功就增加閥值。如果目前記憶體佔用量還是大於新的閥值,則必須要進行spill了,否則認為記憶體還夠用。真正spill操作之後,釋放剛才從shuffleMemoryManager中申請的記憶體以及還原閥值到初始值(5M)。
spill方法:如果partition數量<=200,且沒有設定map端的combine,就呼叫spillToPartitionFiles方法,否則呼叫spillToMergeableFile方法,之後會講到。
所以在這個分支而言,我們是shouldCombine的,所以呼叫的是spillToMergeFile方法。
需要注意的是,在spill之前,我們是有一個數據結構來儲存資料的,有map和buffer可選擇。由於shouldCombine是有可能去更新資料的,即呼叫我們的mergeValue方法之類的,所以我們用map。
(2) 如果是bypassMergeSort(partition數量<=200,且沒有設定map端的combine),呼叫的是spillToPartitionFiles方法。這種模式直接寫partition file,就沒有快取這一說了。
(3) 如果是 非shouldCombine,非bypassMergeSort,這裡因為我們不需要merge操作,直接使用buffer作為spill前的快取結構。之後呼叫maybeSpillCollection方法。
看一下spillToMergeableFile方法:
(1) 在localDirs下面的子目錄下建立一個寫shuffle的檔案
(2) 對快取中的資料進行排序,原則是按partitionID和partition內的key排序,得到的資料格式為((partitionId_0,key_0),value_0),((partitionId_0,key_1),value_1)......((partitionId_100,key_100),value_100)。
(3) 逐步往檔案裡寫,每寫10000個,sync一把。同時儲存一個spilledFile的結構在記憶體中。
也就是說,一個map task,每次spill都生成一個檔案(因為有可能一個map task有多次spill),檔案內有序。
這樣,一次spill就完成了。
看一下spillToPartitionFiles方法:
每個map task對每一個reduce 分割槽都建立一個不同的檔案,也不需要排序。
insertAll方法介紹完了,接著往下介紹。
根據shuffleId+ mapId資訊建立data檔案,呼叫writePartitionedFile方法:
(1) 如果之前是bypassMergeSort,即呼叫的是spillToPartitionFiles,就把剩餘的buffer中的資訊寫到指定的reduce分割槽對應的檔案。然後將所有的輸出檔案合併成一個data檔案
(2) 如果記憶體中沒有spilledFile的資訊,即全部的資訊都在記憶體中,就直接寫到data檔案即可
(3) 否則,也是最複雜的情況,將這個map task輸出的所有檔案,按partition進行整合到一個data檔案中,格式大概為(partition0,這個map task中分割槽0的全部資料),(partition1,這個map task中分割槽1的全部資料)......
需要注意的是,(2)和(3)的情況寫到一個data檔案中時,每個partition在data檔案中的的大小是記錄下來的。
建立data檔案相對應的index檔案,index檔案記錄了data檔案中的每個partition的起始offset。可以想象,記錄了每個partition的offset,其實就是知道了每個partition在data檔案中哪一部分。
最後將shuffleServerId(記錄了host,port、executorId),每個partition的檔案length封裝成mapStatus返回。
二、HashShuffleManager
Spark在每一個Mapper中為每個Reducer建立一個bucket,並將RDD計算結果放進bucket中。每一個bucket擁有一個DiskObjectWriter,每個write
handler擁有一個buffer size,使用這個write handler將Map output寫入檔案中。也就是說Map output的key-value pair是逐個寫入到磁碟而不是預先把所有資料儲存在記憶體中在整體flush到磁碟中去,這樣對於記憶體的壓力會小很多。當然,同時執行的map數受限於資源,所以所需記憶體大概為cores*reducer
num*buffer size。但是,當reduce數量和map數量很大的時候,所需的記憶體開銷也是驚人的。
hashShuffleManager寫的流程相對而言就簡單很多了
/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
val iter = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
dep.aggregator.get.combineValuesByKey(records, context)
} else {
records
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
records
}
for (elem <- iter) {
val bucketId = dep.partitioner.getPartition(elem._1)
shuffle.writers(bucketId).write(elem._1, elem._2)
}
}
(1) 如果定義了mapSideCombine,同上insertAll方法中的shouldCombine分支類似,對k-v進行合併處理。否則就不做處理。
(2) 然後將所有的k-v計算需要輸出到哪個分割槽,逐個寫入指定的分割槽檔案中。
這種模式自然不需要排序,merge等複雜操作,因為最終每個map task對每一個reduce分割槽輸出一個檔案。
最後還是同樣組裝成一個mapStatus結構返回。
至此,shuffle的寫流程就介紹結束了。
下一節介紹shuffle的讀流程。