1. 程式人生 > >spark shuffle寫操作三部曲之UnsafeShuffleWriter

spark shuffle寫操作三部曲之UnsafeShuffleWriter

前言

在前兩篇文章 spark shuffle的寫操作之準備工作 中引出了spark shuffle的三種實現,spark shuffle寫操作三部曲之BypassMergeSortShuffleWriter 講述了BypassMergeSortShuffleWriter 用於shuffle寫操作的具體細節,實現相對比較樸素,值得學習。本篇文章,主要剖析了 UnsafeShuffleWriter用作寫shuffle資料的具體細節,它在 BypassMergeSortShuffleWriter 的思路上更進一步,建議先看 spark shuffle寫操作三部曲之BypassMergeSortShuffleWriter,再來看本篇文章。下面先來看UnsafeShuffleWriter的主要依賴實現類 -- ShuffleExternalSorter。

sort-based shuffle的外部sorter -- ShuffleExternalSorter

在看本小節之前,建議先參照 spark 原始碼分析之二十二-- Task的記憶體管理 對任務的記憶體管理做一下詳細的瞭解,因為ShuffleExternalSorter使用了記憶體的排序。任務在做大資料量的記憶體操作時,記憶體是需要管理的。

在正式剖析之前,先剖析其依賴類。

依賴之記錄block元資訊-- SpillInfo

它記錄了block的一些元資料資訊。

其類結構如下:

其中,blockId就是shuffle的臨時的blockId,file就是shuffle合併後的檔案,partitionLengths表示每一個分割槽的大小。

依賴之分割槽排序器 -- ShuffleInMemorySorter

可以在任何記憶體使用的陣列--LongArray

它支援堆內記憶體和堆外記憶體,它有四個屬性:

數組裡的一個元素的地址等於:

if (baseObj  == null) ? baseOffset(is real os address) + (length - 1) * WIDTH : address(baseObj) +  baseOffset(is relative address 0) + (length - 1) * WIDTH

所有元素設為0:

設定元素

其底層使用unsafe類來設定值

獲取元素

其底層使用unsafe類來獲取值

記錄指標地址壓縮器 -- PackedRecordPointer

全稱:org.apache.spark.shuffle.sort.PackedRecordPointer

成員常量:

壓縮記錄指標和分割槽:

獲取記錄的地址:

獲取記錄的分割槽:

自定義比較器--SortComparator

思路也很簡單,就是根據分割槽來排序,即相同分割槽的資料被排到了一起。

遍歷自定義陣列的迭代器 -- ShuffleSorterIterator

其定義如下:

其思路很簡單,hasNext跟JDK標準庫的實現一致,多了一個loadNext,每次都需要把陣列中下一個位置的元素放到packetRecordPointer中,然後從packedRecordPointer中取出資料的地址和分割槽資訊。

獲取迭代器

獲取迭代器的原始碼如下:

其中 useRadixSort表示是否使用基數排序,預設是使用基數排序的,由引數 spark.shuffle.sort.useRadixSort 配置。

如果不使用基數排序,則會使用Spark的Sorter排序,sorter底層實現是TimSort,TimSort是優化之後的MergeSort。

總之,ShuffleSorterIterator中的資料已經是有序的了,只需要迭代式取出即可。

插入資料到自定義的陣列中

思路很簡單,插入的資料就是記錄的地址和分割槽資料,這兩種資料被PackedRecordPointer壓縮編碼之後被存入到陣列中。

 

繼承關係

其繼承關係如下:

即它是MemoryConsumer的子類,其實現了spill方法。

成員變數

其成員變數如下:

DISK_WRITE_BUFFER_SIZE:寫到磁碟前的緩衝區大小為1M

numPartitions:reduce的分割槽數

taskMemoryManager:負責任務的記憶體管理。看 spark 原始碼分析之二十二-- Task的記憶體管理 做進一步瞭解。

blockManager:Spark儲存系統的核心類。看 spark 原始碼分析之十八 -- Spark儲存體系剖析 做進一步瞭解。

TaskContext:任務執行的上下文物件。

numElementsForSpillThreshold:ShuffleInMemorySorter 資料溢位前的元素閥值。

fileBufferSizeBytes:DiskBlockObjectWriter溢位前的buffer大小。

diskWriteBufferSize:溢位到磁碟前的buffer大小。

allocatedPages:記錄分配的記憶體頁。

spills:記錄溢位資訊

peakMemoryUsedBytes:記憶體使用峰值。

inMemSorter:記憶體排序器

currentPage:當前使用記憶體頁

pageCursor:記憶體頁遊標,標誌在記憶體頁的位置。

構造方法

其構造方法如下:

fileBufferSizeBytes:通過引數 spark.shuffle.file.buffer 來配置,預設為 32k

numElementsForSpillThreshold:通過引數spark.shuffle.spill.numElementsForceSpillThreshold來配置,預設是整數的最大值。

diskWriteBufferSize:通過 spark.shuffle.spill.diskWriteBufferSize 來配置,預設為 1M

核心方法

主要方法如下:

我們主要分析其主要方法。

溢位操作

其原始碼如下:

思路很簡單,呼叫writeSortedFile將資料寫入到檔案中,釋放記憶體,重置inMemSorter。

freeMemory方法如下:

 

writeSortedFile 原始碼如下:

圖中,我大致把步驟劃分為四部分。核心步驟是第3步。

整體思路:遍歷sorter中的所有分割槽資料,最終同一分割槽的資料被寫入到同一個FileSegment中,這些FileSegment最終又構成了一個合併的檔案,其中FileSegment的大小被存放在SpillInfo中,最後放到了spills集合中。

 

插入記錄

其原始碼如下:

注意:它在插入資料之前,offset做了位元組對齊,如果系統支援對齊,則向後錯4位,否則向後錯8位。這跟溢位操作裡取資料是對應的,即可以跟上文中 writeSortedFile 方法對比看。

org.apache.spark.shuffle.sort.ShuffleExternalSorter#growPointerArrayIfNecessary原始碼如下:

解釋:首先hasSpaceForAnotherRecord會比較陣列中下一個寫的索引位置跟陣列的最大容量比較,如果索引位置大於最大容量,那麼就沒有空間來存放下一個記錄了,則需要把擴容,used是指的陣列現在使用的大小,擴容倍數為源陣列的一倍。

org.apache.spark.shuffle.sort.ShuffleExternalSorter#acquireNewPageIfNecessary 原始碼如下:

解釋:分配記憶體頁的條件是當前頁的遊標 + 需要的頁大小 大於當前頁的最大容量,則需要重新分配一個記憶體頁。

 

關閉並且獲取spill資訊

其原始碼如下:

思路:執行最後一次溢位,然後將資料溢位資訊返回。

 

清理資源

 

思路:釋放記憶體排序器的記憶體,刪除溢位的臨時檔案。

 

獲取記憶體使用峰值

原始碼如下:

思路:當前使用記憶體大於最大峰值則更新最大峰值,否則直接返回。

總結

這個sorter內部整合的記憶體sorter會把同一分割槽的資料排序到一起,資料溢位時,相同分割槽的資料會聚集到溢位檔案的一個segment中。

 

 

使用UnsafeShuffleWriter寫資料

先上原始碼,後解釋:

 

思路:流程很簡單,將所有的資料逐一遍歷放入sorter,然後將sorter關閉,獲取輸出檔案,結束。

下面我們具體來看每一步是具體怎麼實現的:

初始化Sorter

在org.apache.spark.shuffle.sort.UnsafeShuffleWriter的構造方法原始碼如下:

簡單做一下說明:

DEFAULT_INITIAL_SORT_BUFFER_SIZE為 4096

DEFAULT_INITIAL_SER_BUFFER_SIZE 大小為 1M

reduce 分割槽數量最大為 16777216

SHUFFLE_FILE_BUFFER_SIZE預設為32k,大小由引數 spark.shuffle.file.buffer 配置。

SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE 預設大小為32k,大小由引數 spark.shuffle.unsafe.file.output.buffer 配置。

其open方法如下:

這個方法裡涉及了三個類:ShuffleExternalSorter,MyByteArrayOutputStream以及SerializationStream三個類。ShuffleExternalSorter在上文已經剖析過了,MyByteArrayOutputStream是一個ByteArrayOutputStream子類負責想堆內記憶體中寫資料,SerializationStream是一個序列化之後的流,資料最終會被寫入到serBuffer記憶體流中,呼叫其flush方法後,其內部的buf就是寫入的資料,如下:

 

資料寫入概述

核心方法write原始碼如下:

其主要有兩步,一步是遍歷每一條記錄,將資料寫入到sorter中;第二步是關閉sorter,並將資料寫入到一個shuffle 檔案中同時更新shuffle索引資訊;最後清除shuffle過程中sorter使用的資源。

先來看第一步:資料寫入到sorter中。

資料插入到Sorter

記錄中的鍵值被序列化到serBuffer的buf位元組陣列中,然後被寫入到 sorter(ShuffleExternalSorter)中。在sorter中序列化資料被寫入到記憶體中(記憶體不足會溢位到磁碟中),其地址資訊被寫入到 ShuffleInMemorySorter 中,具體可以看上文介紹。

溢位檔案歸併為一個檔案

一步是遍歷每一條記錄,將資料寫入到sorter中後會呼叫sorter的closeAndGetSpills方法執行最後一次spill操作,然後獲取到整個shuffle過程中所有的SpillInfo資訊。然後使用ShuffleBlockResolver獲取到shuffle的blockId對應的shuffle檔案,最終呼叫mergeSpills 方法合併所有的溢位檔案到最終的shuffle檔案,然後更新shuffle索引檔案,設定Shuffle結果的MapStatus資訊,結束。

org.apache.spark.shuffle.sort.UnsafeShuffleWriter#closeAndWriteOutput 原始碼如下:

其關鍵方法 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpills 原始碼如下:

如果溢位檔案為0,直接返回全是0的分割槽陣列。

如果溢位檔案為1,檔案重新命名後返回只有一個元素的分割槽陣列。

如果溢位檔案多於1個則,多個溢位檔案開始merge。

 

首先先看一下五個變數:

encryptionEnabled:是否啟用加密,預設為false,通過 spark.io.encryption.enabled 引數來設定。

transferToEnabled:是否可以使用nio的transferTo傳輸,預設為true,通過 spark.file.transferTo 引數來設定。

compressionEnabled:是否使用壓縮,預設為true,通過 spark.shuffle.compress 引數來設定。

compressionCodec:預設壓縮類,預設為LZ4CompressionCodec,通過 spark.io.compression.codec 引數來設定。

fastMergeEnabled:是否啟用fast merge,預設為true,通過 spark.shuffle.unsafe.fastMergeEnabled 引數來設定。

fastMergeIsSupported:是否支援 fast merge,如果不使用壓縮或者是壓縮演算法是 org.apache.spark.io.SnappyCompressionCodec、org.apache.spark.io.LZFCompressionCodec、org.apache.spark.io.LZ4CompressionCodec、org.apache.spark.io.ZStdCompressionCodec這四種支援連線的壓縮演算法中的一種都是可以使用 fast merge的。

 

三種merge多個檔案的方式:transfered-based fast merge、fileStream-based fast merge以及slow merge三種方式。

使用transfered-based fast merge條件:使用 fast merge並且壓縮演算法支援fast merge,並且啟用了nio的transferTo傳輸且不啟用檔案加密。

使用fileStream-based fast merge條件:使用 fast merge並且壓縮演算法支援fast merge,並且未啟用nio的transferTo傳輸或啟用了檔案加密。

使用slow merge條件:未使用 fast merge或壓縮演算法不支援fast merge。

下面我們來看三種合併溢位的方式。

transfered-based fast merge

其核心方法org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpillsWithTransferTo 原始碼如下:

其依賴方法 org.apache.spark.util.Utils#copyFileStreamNIO 如下:

很簡單,底層依賴於Java的NIO的transferTo方法實現。

fileStream-based fast merge

其核心方法 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpillsWithFileStream 原始碼如下,這裡不傳入任何壓縮類,見 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpills 原始碼。

slow merge

其其核心方法 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpillsWithFileStream 原始碼跟 fileStream-based fast merge 裡的一樣,不做過多解釋,只不過這裡多傳入了一個壓縮類,見  org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpills 原始碼。

更新shuffle索引

這部分更詳細的可以看 org.apache.spark.shuffle.IndexShuffleBlockResolver#writeIndexFileAndCommit 原始碼。在上篇文章 spark shuffle寫操作三部曲之BypassMergeSortShuffleWriter 中使用BypassMergeSortShuffleWriter寫資料已經剖析過,不再剖析。

總結

ShuffleExternalSorter將資料不斷溢位到溢位小檔案中,溢位檔案內的資料是按分割槽規則排序的,分割槽內的資料是亂序的。

多個分割槽的資料同時溢位到一個溢位檔案,最後使用三種歸併方式將多個溢位檔案歸併到一個檔案,分割槽內的資料是亂序的。最終資料的格式跟第一種shuffle寫操作的結果是一樣的,即有分割槽的shuffle資料檔案和記錄分割槽大小的shuffle索引檔案。

&n