Spark Tungsten-sort Based Shuffle 分析
Tungsten-sort 算不得一個全新的shuffle 方案,它在特定場景下基於類似現有的Sort Based Shuffle處理流程,對記憶體/CPU/Cache使用做了非常大的優化。帶來高效的同時,也就限定了自己的使用場景。如果Tungsten-sort 發現自己無法處理,則會自動使用 Sort Based Shuffle進行處理。
前言
Tungsten 中文是鎢絲
的意思。 Tungsten Project 是 Databricks 公司提出的對Spark優化記憶體和CPU使用的計劃,該計劃初期似乎對Spark SQL優化的最多。不過部分RDD API 還有Shuffle也因此受益。
簡述
Tungsten-sort優化點主要在三個方面:
- 直接在serialized binary data上sort而不是java objects,減少了memory的開銷和GC的overhead。
- 提供cache-efficient sorter,使用一個8bytes的指標,把排序轉化成了一個指標陣列的排序。
- spill的merge過程也無需反序列化即可完成
這些優化的實現導致引入了一個新的記憶體管理模型,類似OS的Page,對應的實際資料結構為MemoryBlock
,支援off-heap 以及 in-heap 兩種模式。為了能夠對Record 在這些MemoryBlock進行定位,引入了Pointer(指標)的概念。
如果你還記得Sort Based Shuffle裡儲存資料的物件PartitionedAppendOnlyMap
,這是一個放在JVM heap裡普通物件,在Tungsten-sort中,他被替換成了類似作業系統記憶體頁的物件。如果你無法申請到新的Page,這個時候就要執行spill操作,也就是寫入到磁碟的操作。具體觸發條件,和Sort Based Shuffle 也是類似的。
開啟條件
Spark 預設開啟的是Sort Based Shuffle,想要開啟Tungsten-sort ,請設定
spark.shuffle.manager=tungsten-sort
對應的實現類是:
org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
名字的來源是因為使用了大量JDK Sun Unsafe API。
當且僅當下面條件都滿足時,才會使用新的Shuffle方式:
- Shuffle dependency 不能帶有aggregation 或者輸出需要排序
- Shuffle 的序列化器需要是 KryoSerializer 或者 Spark SQL’s 自定義的一些序列化方式.
- Shuffle 檔案的數量不能大於 16777216
- 序列化時,單條記錄不能大於 128 MB
可以看到,能使用的條件還是挺苛刻的。
這些限制來源於哪裡
參看如下程式碼,page的大小:
this.pageSizeBytes = (int) Math.min(
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,
shuffleMemoryManager.pageSizeBytes());
這就保證了頁大小不超過PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES
的值,該值就被定義成了128M。
而產生這個限制的具體設計原因,我們還要仔細分析下Tungsten的記憶體模型:
https://github.com/hustnn/TungstenSecret/tree/master” title=”” />
這張圖其實畫的是 on-heap 的記憶體邏輯圖,其中 #Page 部分為13bit, Offset 為51bit,你會發現 2^51 >>128M的。但是在Shuffle的過程中,對51bit 做了壓縮,使用了27bit,具體如下:
[24 bit partition number][13 bit memory page number][27 bit offset in page]
這裡預留出的24bit給了partition number,為了後面的排序用。上面的好幾個限制其實都是因為這個指標引起的:
- 一個是partition 的限制,前面的數字
16777216
就是來源於partition number 使用24bit 表示的。 - 第二個是page number
- 第三個是偏移量,最大能表示到2^27=128M。那一個task 能管理到的記憶體是受限於這個指標的,最多是 2^13 * 128M 也就是1TB左右。
有了這個指標,我們就可以定位和管理到off-heap 或者 on-heap裡的記憶體了。這個模型還是很漂亮的,記憶體管理也非常高效,記得之前的預估PartitionedAppendOnlyMap
的記憶體是非常困難的,但是通過現在的記憶體管理機制,是非常快速並且精確的。
對於第一個限制,那是因為後續Shuffle Write的sort 部分,只對前面24bit的partiton number 進行排序,key的值沒有被編碼到這個指標,所以沒辦法進行ordering
同時,因為整個過程是追求不反序列化的,所以不能做aggregation。
Shuffle Write
核心類:
org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter
資料會通過 UnsafeShuffleExternalSorter.insertRecordIntoSorter
一條一條寫入到 serOutputStream
序列化輸出流。
這裡消耗記憶體的地方是
serBuffer = new MyByteArrayOutputStream(1024 * 1024)
預設是1M,類似於Sort Based Shuffle 中的ExternalSorter
,在Tungsten Sort 對應的為UnsafeShuffleExternalSorter
,記錄序列化後就通過sorter.insertRecord
方法放到sorter裡去了。
這裡sorter 負責申請Page,釋放Page,判斷是否要進行spill都這個類裡完成。程式碼的架子其實和Sort Based 是一樣的。
https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-core/” title=”” />
(另外,值得注意的是,這張圖裡進行spill操作的同時檢查記憶體可用而導致的Exeception 的bug 已經在1.5.1版本被修復了,忽略那條路徑)
記憶體是否充足的條件依然shuffleMemoryManager
來決定,也就是所有task shuffle 申請的Page記憶體總和不能大於下面的值:
ExecutorHeapMemeory * 0.2 * 0.8
上面的數字可通過下面兩個配置來更改:
spark.shuffle.memoryFraction=0.2
spark.shuffle.safetyFraction=0.8
UnsafeShuffleExternalSorter 負責申請記憶體,並且會生成該條記錄最後的邏輯地址,也就前面提到的 Pointer。
接著Record 會繼續流轉到UnsafeShuffleInMemorySorter
中,這個物件維護了一個指標陣列:
private long[] pointerArray;
陣列的初始大小為 4096,後續如果不夠了,則按每次兩倍大小進行擴充。
假設100萬條記錄,那麼該陣列大約是8M 左右,所以其實還是很小的。一旦spill後該UnsafeShuffleInMemorySorter
就會被賦為null,被回收掉。
我們回過頭來看spill,其實邏輯上也異常簡單了,UnsafeShuffleInMemorySorter
會返回一個迭代器,該迭代器粒度每個元素就是一個指標,然後到根據該指標可以拿到真實的record,然後寫入到磁碟,因為這些record 在一開始進入UnsafeShuffleExternalSorter
就已經被序列化了,所以在這裡就純粹變成寫位元組陣列了。形成的結構依然和Sort Based Shuffle 一致,一個檔案裡不同的partiton的資料用fileSegment來表示,對應的資訊存在一個index檔案裡。
另外寫檔案的時候也需要一個 buffer :
spark.shuffle.file.buffer = 32k
另外從記憶體裡拿到資料放到DiskWriter,這中間還要有個中轉,是通過
final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE=1024 * 1024];
來完成的,都是記憶體,所以很快。
Task結束前,我們要做一次mergeSpills
操作,然後形成一個shuffle 檔案。這裡面其實也挺複雜的,
如果開啟了
`spark.shuffle.unsafe.fastMergeEnabled=true`
並且沒有開啟
`spark.shuffle.compress=true`
或者壓縮方式為:
LZFCompressionCodec
則可以非常高效的進行合併,叫做transferTo
。不過無論是什麼合併,都不需要進行反序列化。
Shuffle Read
Shuffle Read 完全複用HashShuffleReader
,具體參看 Sort-Based Shuffle。
總結
我個人感覺,Tungsten-sort 實現了記憶體的自主管理,管理方式模擬了作業系統的方式,通過Page可以使得大量的record被順序儲存在記憶體,整個shuffle write 排序的過程只需要對指標進行運算(二進位制排序),並且無需反序列化,整個過程非常高效,對於減少GC,提高記憶體訪問效率,提高CPU使用效率確實帶來了明顯的提升。