spark原始碼閱讀--shuffle過程分析
ShuffleManager(一)
本篇,我們來看一下spark核心中另一個重要的模組,Shuffle管理器ShuffleManager。shuffle可以說是分散式計算中最重要的一個概念了,資料的join,聚合去重等操作都需要這個步驟。另一方面,spark之所以比mapReduce的效能高其中一個主要的原因就是對shuffle過程的優化,一方面spark的shuffle過程更好地利用記憶體(也就是我們前面在分析記憶體管理時所說的執行記憶體),另一方面對於shuffle過程中溢寫的磁碟檔案歸併排序和引入索引檔案。當然,spark效能高的另一個主要原因還有對計算鏈的優化,把多步map型別的計算chain在一起,大大減少中間過程的落盤,這也是spark顯著區別於mr的地方。
SparkEnv初始化部分的程式碼:
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
ShuffleMapTask.runTask
看shuffle管理器的原始碼,我們首先應該ShuffleManager的呼叫時機。想一下shuffle的過程,無非就是兩個步驟,寫和讀。寫是在map階段,將資料按照一定的分割槽規則歸類到不同的分割槽中,讀是在reduce階段,每個分割槽從map階段的輸出中拉取屬於自己的資料,所以我們分析ShuffleManager原始碼基本也可以沿著這個思路。我們先來分析寫的過程,因為對於一個完整的shuffle過程,肯定是先寫然後才讀的。
回顧一下之前的對作業執行過程的分析,我們應該還記得作業被切分成任務後是在executor端執行的,而Shuffle階段的的stage被切分成了ShuffleMapTask,shuffle的寫過程正是在這個類中完成的,我們看一下程式碼:
可以看到通過ShuffleManager.getWriter獲取了一個shuffle寫入器,從而將rdd的計算資料寫入磁碟。
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
// 反序列化RDD和shuffle, 關鍵的步驟
// 這裡思考rdd和shuffle反序列化時,內部的SparkContext物件是怎麼反序列化的
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
var writer: ShuffleWriter[Any, Any] = null
try {
// shuffle管理器
val manager = SparkEnv.get.shuffleManager
// 獲取一個shuffle寫入器
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
// 這裡可以看到rdd計算的核心方法就是iterator方法
// SortShuffleWriter的write方法可以分為幾個步驟:
// 將上游rdd計算出的資料(通過呼叫rdd.iterator方法)寫入記憶體緩衝區,
// 在寫的過程中如果超過 記憶體閾值就會溢寫磁碟檔案,可能會寫多個檔案
// 最後將溢寫的檔案和記憶體中剩餘的資料一起進行歸併排序後寫入到磁碟中形成一個大的資料檔案
// 這個排序是先按分割槽排序,在按key排序
// 在最後歸併排序後寫的過程中,沒寫一個分割槽就會手動刷寫一遍,並記錄下這個分割槽資料在檔案中的位移
// 所以實際上最後寫完一個task的資料後,磁碟上會有兩個檔案:資料檔案和記錄每個reduce端partition資料位移的索引檔案
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
// 主要是刪除中間過程的溢寫檔案,向記憶體管理器釋放申請的記憶體
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
SortShuffleManager.getWriter
這裡根據shuffle型別獲取不同的ShuffleWriter物件,大多數情況下,都是SortShuffleWriter型別,所以我們直接看SortShuffleWriter.write方法。
/** Get a writer for a given partition. Called on executors by map tasks. */
// 獲取一個shuffle儲存器,在executor端被呼叫,在執行一個map task呼叫
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}
SortShuffleWriter.write
總結一下這個方法的主要邏輯:
- 獲取一個排序器,根據是否需要map端聚合傳遞不同的引數
- 將資料插入排序器中,這個過程或溢寫出多個磁碟檔案
- 根據shuffleid和分割槽id獲取一個磁碟檔名,
- 將多個溢寫的磁碟檔案和記憶體中的排序資料進行歸併排序,並寫到一個檔案中,同時返回每個reduce端分割槽的資料在這個檔案中的位移
- 將索引寫入一個索引檔案,並將資料檔案的檔名由臨時檔名改成正式的檔名。
- 最後封裝一個MapStatus物件,用於ShuffleMapTask.runTask的返回值。
在stop方法中還會做一些收尾工作,統計磁碟io耗時,刪除中間溢寫檔案
override def write(records: Iterator[Product2[K, V]]): Unit = { sorter = if (dep.mapSideCombine) { // map端進行合併的情況,此時使用者應該提供聚合器和順序 require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V]( context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) } // 將map資料全部寫入排序器中, // 這個過程中可能會生成多個溢寫檔案 sorter.insertAll(records) // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). // mapId就是shuffleMap端RDD的partitionId // 獲取這個map分割槽的shuffle輸出檔名 val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) // 加一個uuid字尾 val tmp = Utils.tempFileWith(output) try { val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) // 這一步將溢寫到的磁碟的檔案和記憶體中的資料進行歸併排序, // 並溢寫到一個檔案中,這一步寫的檔案是臨時檔名 val partitionLengths = sorter.writePartitionedFile(blockId, tmp) // 這一步主要是寫入索引檔案,使用move方法原子第將臨時索引和臨時資料檔案重新命名為正常的檔名 shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) // 返回一個狀態物件,包含shuffle服務Id和各個分割槽資料在檔案中的位移 mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } finally { if (tmp.exists() && !tmp.delete()) { logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") } } }
IndexShuffleBlockResolver
我們首先看一下獲取shuffle輸出檔名,是通過IndexShuffleBlockResolver元件獲取的,而它的內部又是通過BlockManager內部的DiskBlockManager分配檔名的,這個DiskBlockManager我在之前分析塊管理器時提到過,它的作用就是管理檔名的分配,以及spark使用的目錄,子目錄的建立刪除等。我們看到對於資料檔案和索引檔案的命名規則是不一樣的,他們的命名規則分別定義在ShuffleDataBlockId和ShuffleIndexBlockId中。
def getDataFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}
private def getIndexFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}
ExternalSorter.insertAll
我們根據SortShuffleWriter中的呼叫順序,首先看一下ExternalSorter.insertAll方法:
- 首選根據是否在愛map端合併分為兩種情況,這兩種情況使用的記憶體儲存結構也不一樣,對於在map端合併的情況使用的是PartitionedAppendOnlyMap結構,不在map合併則使用PartitionedPairBuffer。其中,PartitionedAppendOnlyMap是用陣列和線性探測法實現的map結構。
然後將資料一條一條地迴圈插入記憶體的儲存結構中,同時考慮到map端合併的情況
def insertAll(records: Iterator[Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined // 在map端進行合併的情況 if (shouldCombine) { // Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValue val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { addElementsRead() kv = records.next() // 向記憶體緩衝中插入一條資料 map.changeValue((getPartition(kv._1), kv._1), update) // 如果緩衝超過閾值,就會溢寫到磁碟生成一個檔案 // 每寫入一條資料就檢查一遍記憶體 maybeSpillCollection(usingMap = true) } } else {// 不再map端合併的情況 // Stick values into our buffer while (records.hasNext) { addElementsRead() val kv = records.next() buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) maybeSpillCollection(usingMap = false) } } }
AppendOnlyMap.changeValue
我們看一個稍微複雜一點的結構,AppendOnlyMap,
- 首先考慮空值的情況
- 計算key的hash,然後對容量取餘。注意,這裡由於容量是2的整數次冪,所以對容量取餘的操作等同於和容量-1進行位與操作,java HashMap中的操作。
- 如果,不存在舊值,那麼直接插入,
- 如果存在舊值,更新舊值
- 如果發生hash碰撞,那麼需要向後探測,並且是跳躍性的探測,
可以看出,這個結構設計還是很精良的,這裡面有個很重的方法,incrementSize方法中會判斷當前資料量的大小,如果超過閾值就會擴容,這個擴容的方法比較複雜,就是一個重新hash再分佈的過程,不過有一點,發不論是在插入新資料還是重新hash再分佈的過程中,對於hash碰撞的處理策略一定要相同,否則可能造成不一致。
// 向陣列中插入一個kv對,
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
// 處理key為空的情況
if (k.eq(null)) {
// 如果是第一次插入空值,那麼需要將大小增加1
if (!haveNullValue) {
incrementSize()
}
nullValue = updateFunc(haveNullValue, nullValue)
haveNullValue = true
return nullValue
}
var pos = rehash(k.hashCode) & mask
// 線性探測法處理hash碰撞
// 這裡是一個加速的線性探測,即第一次碰撞時走1步,
// 第二次碰撞時走2步,第三次碰撞時走3步
var i = 1
while (true) {
val curKey = data(2 * pos)
if (curKey.eq(null)) {// 如果舊值不存在,直接插入
val newValue = updateFunc(false, null.asInstanceOf[V])
data(2 * pos) = k
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
incrementSize()
return newValue
} else if (k.eq(curKey) || k.equals(curKey)) {// 如果舊值存在,需要更新
val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
return newValue
} else {// 發生hash碰撞,向後探測,跳躍性的探測
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
null.asInstanceOf[V] // Never reached but needed to keep compiler happy
}
ExternalSorter.maybeSpillCollection
我們回到ExternalSorter的插入方法中,沒插入一條資料都要檢查記憶體佔用,判斷是否需要溢寫到磁碟,如果需要就溢寫到磁碟。
這個方法裡呼叫了map.estimateSize來估算當前插入的資料的記憶體佔用大小,對於記憶體佔用的追蹤和估算的功能是在SizeTracker特質中實現的,這個特質我在之前分析MemoryStore時提到過,在將物件型別的資料插入記憶體中時使用了一箇中間態的資料結構DeserializedValuesHolder,它的內部有一個SizeTrackingVector,這個類就是通過繼承SizeTracker特徵從而實現物件大小的追蹤和估算。
private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) {
estimatedSize = map.estimateSize()
if (maybeSpill(map, estimatedSize)) {
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
estimatedSize = buffer.estimateSize()
if (maybeSpill(buffer, estimatedSize)) {
buffer = new PartitionedPairBuffer[K, C]
}
}
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
}
ExternalSorter.maybeSpill
首先檢查當前記憶體佔用是否超過閾值,如果超過會申請一次執行記憶體,如果沒有申請到足夠的執行記憶體,那麼依然需要溢寫到磁碟
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
// 每寫入32條資料檢查一次
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
// 向記憶體管理器申請執行記憶體
val granted = acquireMemory(amountToRequest)
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
// 如果記憶體佔用超過了閾值,那麼就需要溢寫
shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
// 溢寫到磁碟
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
// 釋放記憶體
releaseMemory()
}
shouldSpill
}
ExternalSorter.spill
接著上面的方法,
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
// 獲取一個排序後的迭代器
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
// 將資料寫入磁碟檔案中
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
spills += spillFile
}
WritablePartitionedPairCollection.destructiveSortedWritablePartitionedIterator
這個方法返回按照分割槽和key排序過的迭代器,其具體的排序邏輯在AppendOnlyMap.destructiveSortedIterator中
AppendOnlyMap.destructiveSortedIterator
這段程式碼分為兩塊,首先對陣列進行壓緊,是的稀疏的資料全部轉移到陣列的頭部;
然後對陣列按照比較器進行排序,比較器首先是按照分割槽進行比較,如果分割槽相同才按照key進行比較;
然後返回一個迭代器,這個迭代器僅僅是對陣列的封裝。通過這個方法,我們大概知道了AppendonlyMap的排序邏輯。
def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
destroyed = true
// Pack KV pairs into the front of the underlying array
// 這段程式碼將稀疏的資料全部轉移到陣列頭部,將資料壓緊
var keyIndex, newIndex = 0
while (keyIndex < capacity) {
if (data(2 * keyIndex) != null) {
data(2 * newIndex) = data(2 * keyIndex)
data(2 * newIndex + 1) = data(2 * keyIndex + 1)
newIndex += 1
}
keyIndex += 1
}
assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
// 根據比較器對資料進行排序
new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)
new Iterator[(K, V)] {
var i = 0
var nullValueReady = haveNullValue
def hasNext: Boolean = (i < newIndex || nullValueReady)
def next(): (K, V) = {
if (nullValueReady) {
nullValueReady = false
(null.asInstanceOf[K], nullValue)
} else {
val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
i += 1
item
}
}
}
}
ExternalSorter.spillMemoryIteratorToDisk
回到ExternalSorter.spill方法中,在獲取了經過排序後 的迭代器之後,我們就可以將資料溢寫到磁碟上了。
這個方法的程式碼我不貼了,總結一下主要步驟:
- 首先通過DiskBlockManager獲取一個臨時塊的BlockId和臨時檔名
- 通過blockManager獲取一個磁碟寫入器,即DiskBlockObjectWriter物件,內部封裝了呼叫java流api寫檔案的邏輯
- 迴圈將每條資料寫入磁碟,並定期進行刷寫(每隔一定的資料條數將記憶體中的資料刷寫到磁碟上)
- 如果發生異常,則會對之前寫入的檔案進行回滾
小結
總結一下資料通過ExternalSorter向磁碟溢寫的全過程:
- 首先,資料會被一條一條地向內部的map結構中插入
- 每插入一條資料都會檢查記憶體佔用情況,如果記憶體佔用超過閾值,並且申請不到足夠的執行記憶體,就會將目前記憶體中的資料溢寫到磁碟
- 對於溢寫的過程:首先會將資料按照分割槽和key進行排序,相同分割槽的資料排在一起,然後根據提供的排序器按照key的順序排;然後通過DiskBlockManager和BlockManager獲取DiskBlockWriter將資料寫入磁碟形成一個檔案。,並將溢寫的檔案資訊
- 在整個寫入過程中,會溢寫多個檔案
ExternalSorter.writePartitionedFile
總結一下主要的步驟:
- 仍然是通過blockManager獲取一個磁碟寫入器
- 將內部溢寫的多個磁碟檔案和滯留在記憶體的資料進行歸併排序,並分裝成一個按照分割槽歸類的迭代器
迴圈將資料寫入磁碟,每當一個分割槽的資料寫完後,進行一次刷寫,將資料從os的檔案緩衝區同步到磁碟上,然後獲取此時的檔案長度,記錄下每個分割槽在檔案中的位移
def writePartitionedFile( blockId: BlockId, outputFile: File): Array[Long] = { // Track location of each range in the output file val lengths = new Array[Long](numPartitions) val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics().shuffleWriteMetrics) // 如果前面沒有資料溢寫到磁碟中, // 則只需要將記憶體中的資料溢寫到磁碟 if (spills.isEmpty) { // Case where we only have in-memory data val collection = if (aggregator.isDefined) map else buffer // 返回排序後的迭代器 val it = collection.destructiveSortedWritablePartitionedIterator(comparator) while (it.hasNext) { val partitionId = it.nextPartition() while (it.hasNext && it.nextPartition() == partitionId) { it.writeNext(writer) } // 寫完一個分割槽刷寫一次 val segment = writer.commitAndGet() // 記錄下分割槽的資料在檔案中的位移 lengths(partitionId) = segment.length } } else {// 有溢寫到磁碟的檔案 // We must perform merge-sort; get an iterator by partition and write everything directly. // 封裝一個用於歸併各個溢寫檔案以及記憶體緩衝區資料的迭代器 // TODO 這個封裝的迭代器是實現歸併排序的關鍵 for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { for (elem <- elements) { writer.write(elem._1, elem._2) } // 每寫完一個分割槽,主動刷寫一次,獲取檔案位移, // 這個位移就是寫入的分割槽的位移, // reduce端在拉取資料時就會根據這個位移直接找到應該拉取的資料的位置 val segment = writer.commitAndGet() lengths(id) = segment.length } } } writer.close() // 寫完後更新一些統計資訊 context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes) // 返回每個reduce端分割槽資料在檔案中的位移資訊 lengths }
IndexShuffleBlockResolver.writeIndexFileAndCommit
仍然回到SortShuffleWriter.write方法,最後一步呼叫了IndexShuffleBlockResolver.writeIndexFileAndCommit方法,
這個方法的作用主要是將每個的分割槽的位移值寫入到一個索引檔案中,並將臨時的索引檔案和臨時的資料檔案重新命名為正常的檔名(重新命名操作是一個原子操作)
總結
我總結shuffle寫資料的過程,可以分為兩個主要的步驟:
- 一是在資料寫入的過程中會由於記憶體不足從而溢寫多個數據檔案到磁碟中,而所有的檔案都是按照分割槽和key排序的,這為第二部歸併排序打下基礎
- 第二部就是將這些溢寫的小檔案和最後記憶體中剩下的資料進行歸併排序,然後寫入一個大檔案中,並且在寫入的過程中記錄每個分割槽資料在檔案中的位移,
- 最後還要寫入一個索引檔案,索引檔案即記錄了每個reduce端分割槽在資料檔案中的位移,這樣reduce在拉取資料的時候才能很快定位到自己分割槽所需要的資料