spark shuffle寫操作三部曲之UnsafeShuffleWriter
前言
在前兩篇文章 spark shuffle的寫操作之準備工作 中引出了spark shuffle的三種實現,spark shuffle寫操作三部曲之BypassMergeSortShuffleWriter 講述了BypassMergeSortShuffleWriter 用於shuffle寫操作的具體細節,實現相對比較樸素,值得學習。本篇文章,主要剖析了 UnsafeShuffleWriter用作寫shuffle資料的具體細節,它在 BypassMergeSortShuffleWriter 的思路上更進一步,建議先看 spark shuffle寫操作三部曲之BypassMergeSortShuffleWriter,再來看本篇文章。下面先來看UnsafeShuffleWriter的主要依賴實現類 -- ShuffleExternalSorter。
sort-based shuffle的外部sorter -- ShuffleExternalSorter
在看本小節之前,建議先參照 spark 原始碼分析之二十二-- Task的記憶體管理 對任務的記憶體管理做一下詳細的瞭解,因為ShuffleExternalSorter使用了記憶體的排序。任務在做大資料量的記憶體操作時,記憶體是需要管理的。
在正式剖析之前,先剖析其依賴類。
依賴之記錄block元資訊-- SpillInfo
它記錄了block的一些元資料資訊。
其類結構如下:
其中,blockId就是shuffle的臨時的blockId,file就是shuffle合併後的檔案,partitionLengths表示每一個分割槽的大小。
依賴之分割槽排序器 -- ShuffleInMemorySorter
可以在任何記憶體使用的陣列--LongArray
它支援堆內記憶體和堆外記憶體,它有四個屬性:
數組裡的一個元素的地址等於:
if (baseObj == null) ? baseOffset(is real os address) + (length - 1) * WIDTH : address(baseObj) + baseOffset(is relative address 0) + (length - 1) * WIDTH
所有元素設為0:
設定元素
其底層使用unsafe類來設定值
獲取元素
其底層使用unsafe類來獲取值
記錄指標地址壓縮器 -- PackedRecordPointer
全稱:org.apache.spark.shuffle.sort.PackedRecordPointer
成員常量:
壓縮記錄指標和分割槽:
獲取記錄的地址:
獲取記錄的分割槽:
自定義比較器--SortComparator
思路也很簡單,就是根據分割槽來排序,即相同分割槽的資料被排到了一起。
遍歷自定義陣列的迭代器 -- ShuffleSorterIterator
其定義如下:
其思路很簡單,hasNext跟JDK標準庫的實現一致,多了一個loadNext,每次都需要把陣列中下一個位置的元素放到packetRecordPointer中,然後從packedRecordPointer中取出資料的地址和分割槽資訊。
獲取迭代器
獲取迭代器的原始碼如下:
其中 useRadixSort表示是否使用基數排序,預設是使用基數排序的,由引數 spark.shuffle.sort.useRadixSort 配置。
如果不使用基數排序,則會使用Spark的Sorter排序,sorter底層實現是TimSort,TimSort是優化之後的MergeSort。
總之,ShuffleSorterIterator中的資料已經是有序的了,只需要迭代式取出即可。
插入資料到自定義的陣列中
思路很簡單,插入的資料就是記錄的地址和分割槽資料,這兩種資料被PackedRecordPointer壓縮編碼之後被存入到陣列中。
繼承關係
其繼承關係如下:
即它是MemoryConsumer的子類,其實現了spill方法。
成員變數
其成員變數如下:
DISK_WRITE_BUFFER_SIZE:寫到磁碟前的緩衝區大小為1M
numPartitions:reduce的分割槽數
taskMemoryManager:負責任務的記憶體管理。看 spark 原始碼分析之二十二-- Task的記憶體管理 做進一步瞭解。
blockManager:Spark儲存系統的核心類。看 spark 原始碼分析之十八 -- Spark儲存體系剖析 做進一步瞭解。
TaskContext:任務執行的上下文物件。
numElementsForSpillThreshold:ShuffleInMemorySorter 資料溢位前的元素閥值。
fileBufferSizeBytes:DiskBlockObjectWriter溢位前的buffer大小。
diskWriteBufferSize:溢位到磁碟前的buffer大小。
allocatedPages:記錄分配的記憶體頁。
spills:記錄溢位資訊
peakMemoryUsedBytes:記憶體使用峰值。
inMemSorter:記憶體排序器
currentPage:當前使用記憶體頁
pageCursor:記憶體頁遊標,標誌在記憶體頁的位置。
構造方法
其構造方法如下:
fileBufferSizeBytes:通過引數 spark.shuffle.file.buffer 來配置,預設為 32k
numElementsForSpillThreshold:通過引數spark.shuffle.spill.numElementsForceSpillThreshold來配置,預設是整數的最大值。
diskWriteBufferSize:通過 spark.shuffle.spill.diskWriteBufferSize 來配置,預設為 1M
核心方法
主要方法如下:
我們主要分析其主要方法。
溢位操作
其原始碼如下:
思路很簡單,呼叫writeSortedFile將資料寫入到檔案中,釋放記憶體,重置inMemSorter。
freeMemory方法如下:
writeSortedFile 原始碼如下:
圖中,我大致把步驟劃分為四部分。核心步驟是第3步。
整體思路:遍歷sorter中的所有分割槽資料,最終同一分割槽的資料被寫入到同一個FileSegment中,這些FileSegment最終又構成了一個合併的檔案,其中FileSegment的大小被存放在SpillInfo中,最後放到了spills集合中。
插入記錄
其原始碼如下:
注意:它在插入資料之前,offset做了位元組對齊,如果系統支援對齊,則向後錯4位,否則向後錯8位。這跟溢位操作裡取資料是對應的,即可以跟上文中 writeSortedFile 方法對比看。
org.apache.spark.shuffle.sort.ShuffleExternalSorter#growPointerArrayIfNecessary原始碼如下:
解釋:首先hasSpaceForAnotherRecord會比較陣列中下一個寫的索引位置跟陣列的最大容量比較,如果索引位置大於最大容量,那麼就沒有空間來存放下一個記錄了,則需要把擴容,used是指的陣列現在使用的大小,擴容倍數為源陣列的一倍。
org.apache.spark.shuffle.sort.ShuffleExternalSorter#acquireNewPageIfNecessary 原始碼如下:
解釋:分配記憶體頁的條件是當前頁的遊標 + 需要的頁大小 大於當前頁的最大容量,則需要重新分配一個記憶體頁。
關閉並且獲取spill資訊
其原始碼如下:
思路:執行最後一次溢位,然後將資料溢位資訊返回。
清理資源
思路:釋放記憶體排序器的記憶體,刪除溢位的臨時檔案。
獲取記憶體使用峰值
原始碼如下:
思路:當前使用記憶體大於最大峰值則更新最大峰值,否則直接返回。
總結
這個sorter內部整合的記憶體sorter會把同一分割槽的資料排序到一起,資料溢位時,相同分割槽的資料會聚集到溢位檔案的一個segment中。
使用UnsafeShuffleWriter寫資料
先上原始碼,後解釋:
思路:流程很簡單,將所有的資料逐一遍歷放入sorter,然後將sorter關閉,獲取輸出檔案,結束。
下面我們具體來看每一步是具體怎麼實現的:
初始化Sorter
在org.apache.spark.shuffle.sort.UnsafeShuffleWriter的構造方法原始碼如下:
簡單做一下說明:
DEFAULT_INITIAL_SORT_BUFFER_SIZE為 4096
DEFAULT_INITIAL_SER_BUFFER_SIZE 大小為 1M
reduce 分割槽數量最大為 16777216
SHUFFLE_FILE_BUFFER_SIZE預設為32k,大小由引數 spark.shuffle.file.buffer 配置。
SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE 預設大小為32k,大小由引數 spark.shuffle.unsafe.file.output.buffer 配置。
其open方法如下:
這個方法裡涉及了三個類:ShuffleExternalSorter,MyByteArrayOutputStream以及SerializationStream三個類。ShuffleExternalSorter在上文已經剖析過了,MyByteArrayOutputStream是一個ByteArrayOutputStream子類負責想堆內記憶體中寫資料,SerializationStream是一個序列化之後的流,資料最終會被寫入到serBuffer記憶體流中,呼叫其flush方法後,其內部的buf就是寫入的資料,如下:
資料寫入概述
核心方法write原始碼如下:
其主要有兩步,一步是遍歷每一條記錄,將資料寫入到sorter中;第二步是關閉sorter,並將資料寫入到一個shuffle 檔案中同時更新shuffle索引資訊;最後清除shuffle過程中sorter使用的資源。
先來看第一步:資料寫入到sorter中。
資料插入到Sorter
記錄中的鍵值被序列化到serBuffer的buf位元組陣列中,然後被寫入到 sorter(ShuffleExternalSorter)中。在sorter中序列化資料被寫入到記憶體中(記憶體不足會溢位到磁碟中),其地址資訊被寫入到 ShuffleInMemorySorter 中,具體可以看上文介紹。
溢位檔案歸併為一個檔案
一步是遍歷每一條記錄,將資料寫入到sorter中後會呼叫sorter的closeAndGetSpills方法執行最後一次spill操作,然後獲取到整個shuffle過程中所有的SpillInfo資訊。然後使用ShuffleBlockResolver獲取到shuffle的blockId對應的shuffle檔案,最終呼叫mergeSpills 方法合併所有的溢位檔案到最終的shuffle檔案,然後更新shuffle索引檔案,設定Shuffle結果的MapStatus資訊,結束。
org.apache.spark.shuffle.sort.UnsafeShuffleWriter#closeAndWriteOutput 原始碼如下:
其關鍵方法 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpills 原始碼如下:
如果溢位檔案為0,直接返回全是0的分割槽陣列。
如果溢位檔案為1,檔案重新命名後返回只有一個元素的分割槽陣列。
如果溢位檔案多於1個則,多個溢位檔案開始merge。
首先先看一下五個變數:
encryptionEnabled:是否啟用加密,預設為false,通過 spark.io.encryption.enabled 引數來設定。
transferToEnabled:是否可以使用nio的transferTo傳輸,預設為true,通過 spark.file.transferTo 引數來設定。
compressionEnabled:是否使用壓縮,預設為true,通過 spark.shuffle.compress 引數來設定。
compressionCodec:預設壓縮類,預設為LZ4CompressionCodec,通過 spark.io.compression.codec 引數來設定。
fastMergeEnabled:是否啟用fast merge,預設為true,通過 spark.shuffle.unsafe.fastMergeEnabled 引數來設定。
fastMergeIsSupported:是否支援 fast merge,如果不使用壓縮或者是壓縮演算法是 org.apache.spark.io.SnappyCompressionCodec、org.apache.spark.io.LZFCompressionCodec、org.apache.spark.io.LZ4CompressionCodec、org.apache.spark.io.ZStdCompressionCodec這四種支援連線的壓縮演算法中的一種都是可以使用 fast merge的。
三種merge多個檔案的方式:transfered-based fast merge、fileStream-based fast merge以及slow merge三種方式。
使用transfered-based fast merge條件:使用 fast merge並且壓縮演算法支援fast merge,並且啟用了nio的transferTo傳輸且不啟用檔案加密。
使用fileStream-based fast merge條件:使用 fast merge並且壓縮演算法支援fast merge,並且未啟用nio的transferTo傳輸或啟用了檔案加密。
使用slow merge條件:未使用 fast merge或壓縮演算法不支援fast merge。
下面我們來看三種合併溢位的方式。
transfered-based fast merge
其核心方法org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpillsWithTransferTo 原始碼如下:
其依賴方法 org.apache.spark.util.Utils#copyFileStreamNIO 如下:
很簡單,底層依賴於Java的NIO的transferTo方法實現。
fileStream-based fast merge
其核心方法 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpillsWithFileStream 原始碼如下,這裡不傳入任何壓縮類,見 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpills 原始碼。
slow merge
其其核心方法 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpillsWithFileStream 原始碼跟 fileStream-based fast merge 裡的一樣,不做過多解釋,只不過這裡多傳入了一個壓縮類,見 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpills 原始碼。
更新shuffle索引
這部分更詳細的可以看 org.apache.spark.shuffle.IndexShuffleBlockResolver#writeIndexFileAndCommit 原始碼。在上篇文章 spark shuffle寫操作三部曲之BypassMergeSortShuffleWriter 中使用BypassMergeSortShuffleWriter寫資料已經剖析過,不再剖析。
總結
ShuffleExternalSorter將資料不斷溢位到溢位小檔案中,溢位檔案內的資料是按分割槽規則排序的,分割槽內的資料是亂序的。
多個分割槽的資料同時溢位到一個溢位檔案,最後使用三種歸併方式將多個溢位檔案歸併到一個檔案,分割槽內的資料是亂序的。最終資料的格式跟第一種shuffle寫操作的結果是一樣的,即有分割槽的shuffle資料檔案和記錄分割槽大小的shuffle索引檔案。
&n