1. 程式人生 > >Spark Tungsten-sort Based Shuffle 分析

Spark Tungsten-sort Based Shuffle 分析

Tungsten-sort 算不得一個全新的shuffle 方案,它在特定場景下基於類似現有的Sort Based Shuffle處理流程,對記憶體/CPU/Cache使用做了非常大的優化。帶來高效的同時,也就限定了自己的使用場景。如果Tungsten-sort 發現自己無法處理,則會自動使用 Sort Based Shuffle進行處理。

前言

Tungsten 中文是鎢絲的意思。 Tungsten Project 是 Databricks 公司提出的對Spark優化記憶體和CPU使用的計劃,該計劃初期似乎對Spark SQL優化的最多。不過部分RDD API 還有Shuffle也因此受益。

簡述

Tungsten-sort優化點主要在三個方面:

  1. 直接在serialized binary data上sort而不是java objects,減少了memory的開銷和GC的overhead。
  2. 提供cache-efficient sorter,使用一個8bytes的指標,把排序轉化成了一個指標陣列的排序。
  3. spill的merge過程也無需反序列化即可完成

這些優化的實現導致引入了一個新的記憶體管理模型,類似OS的Page,對應的實際資料結構為MemoryBlock,支援off-heap 以及 in-heap 兩種模式。為了能夠對Record 在這些MemoryBlock進行定位,引入了Pointer(指標)的概念。

如果你還記得Sort Based Shuffle裡儲存資料的物件PartitionedAppendOnlyMap,這是一個放在JVM heap裡普通物件,在Tungsten-sort中,他被替換成了類似作業系統記憶體頁的物件。如果你無法申請到新的Page,這個時候就要執行spill操作,也就是寫入到磁碟的操作。具體觸發條件,和Sort Based Shuffle 也是類似的。

開啟條件

Spark 預設開啟的是Sort Based Shuffle,想要開啟Tungsten-sort ,請設定

spark.shuffle.manager=tungsten-sort

對應的實現類是:

org.apache.spark.shuffle.unsafe.UnsafeShuffleManager

名字的來源是因為使用了大量JDK Sun Unsafe API。

當且僅當下面條件都滿足時,才會使用新的Shuffle方式:

  • Shuffle dependency 不能帶有aggregation 或者輸出需要排序
  • Shuffle 的序列化器需要是 KryoSerializer 或者 Spark SQL’s 自定義的一些序列化方式.
  • Shuffle 檔案的數量不能大於 16777216
  • 序列化時,單條記錄不能大於 128 MB

可以看到,能使用的條件還是挺苛刻的。

這些限制來源於哪裡

參看如下程式碼,page的大小:

this.pageSizeBytes = (int) Math.min(  
                PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, 
                shuffleMemoryManager.pageSizeBytes());

這就保證了頁大小不超過PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES 的值,該值就被定義成了128M。

而產生這個限制的具體設計原因,我們還要仔細分析下Tungsten的記憶體模型:

來源於:<a href=https://github.com/hustnn/TungstenSecret/tree/master” title=”” />

這張圖其實畫的是 on-heap 的記憶體邏輯圖,其中 #Page 部分為13bit, Offset 為51bit,你會發現 2^51 >>128M的。但是在Shuffle的過程中,對51bit 做了壓縮,使用了27bit,具體如下:

 [24 bit partition number][13 bit memory page number][27 bit offset in page]

這裡預留出的24bit給了partition number,為了後面的排序用。上面的好幾個限制其實都是因為這個指標引起的:

  1. 一個是partition 的限制,前面的數字 16777216 就是來源於partition number 使用24bit 表示的。
  2. 第二個是page number
  3. 第三個是偏移量,最大能表示到2^27=128M。那一個task 能管理到的記憶體是受限於這個指標的,最多是 2^13 * 128M 也就是1TB左右。

有了這個指標,我們就可以定位和管理到off-heap 或者 on-heap裡的記憶體了。這個模型還是很漂亮的,記憶體管理也非常高效,記得之前的預估PartitionedAppendOnlyMap的記憶體是非常困難的,但是通過現在的記憶體管理機制,是非常快速並且精確的。

對於第一個限制,那是因為後續Shuffle Write的sort 部分,只對前面24bit的partiton number 進行排序,key的值沒有被編碼到這個指標,所以沒辦法進行ordering

同時,因為整個過程是追求不反序列化的,所以不能做aggregation。

Shuffle Write

核心類:

 org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter

資料會通過 UnsafeShuffleExternalSorter.insertRecordIntoSorter 一條一條寫入到 serOutputStream 序列化輸出流。

這裡消耗記憶體的地方是

 serBuffer = new MyByteArrayOutputStream(1024 * 1024)

預設是1M,類似於Sort Based Shuffle 中的ExternalSorter,在Tungsten Sort 對應的為UnsafeShuffleExternalSorter,記錄序列化後就通過sorter.insertRecord方法放到sorter裡去了。

這裡sorter 負責申請Page,釋放Page,判斷是否要進行spill都這個類裡完成。程式碼的架子其實和Sort Based 是一樣的。
圖片來源:<a href=https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-core/” title=”” />

(另外,值得注意的是,這張圖裡進行spill操作的同時檢查記憶體可用而導致的Exeception 的bug 已經在1.5.1版本被修復了,忽略那條路徑)

記憶體是否充足的條件依然shuffleMemoryManager 來決定,也就是所有task shuffle 申請的Page記憶體總和不能大於下面的值:

 ExecutorHeapMemeory * 0.2 * 0.8

上面的數字可通過下面兩個配置來更改:

spark.shuffle.memoryFraction=0.2
spark.shuffle.safetyFraction=0.8

UnsafeShuffleExternalSorter 負責申請記憶體,並且會生成該條記錄最後的邏輯地址,也就前面提到的 Pointer。

接著Record 會繼續流轉到UnsafeShuffleInMemorySorter中,這個物件維護了一個指標陣列:

  private long[] pointerArray;

陣列的初始大小為 4096,後續如果不夠了,則按每次兩倍大小進行擴充。

假設100萬條記錄,那麼該陣列大約是8M 左右,所以其實還是很小的。一旦spill後該UnsafeShuffleInMemorySorter就會被賦為null,被回收掉。

我們回過頭來看spill,其實邏輯上也異常簡單了,UnsafeShuffleInMemorySorter 會返回一個迭代器,該迭代器粒度每個元素就是一個指標,然後到根據該指標可以拿到真實的record,然後寫入到磁碟,因為這些record 在一開始進入UnsafeShuffleExternalSorter 就已經被序列化了,所以在這裡就純粹變成寫位元組陣列了。形成的結構依然和Sort Based Shuffle 一致,一個檔案裡不同的partiton的資料用fileSegment來表示,對應的資訊存在一個index檔案裡。

另外寫檔案的時候也需要一個 buffer :

 spark.shuffle.file.buffer = 32k

另外從記憶體裡拿到資料放到DiskWriter,這中間還要有個中轉,是通過

 final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE=1024 * 1024];

來完成的,都是記憶體,所以很快。

Task結束前,我們要做一次mergeSpills操作,然後形成一個shuffle 檔案。這裡面其實也挺複雜的,
如果開啟了

 `spark.shuffle.unsafe.fastMergeEnabled=true`

並且沒有開啟

`spark.shuffle.compress=true`

或者壓縮方式為:

 LZFCompressionCodec

則可以非常高效的進行合併,叫做transferTo。不過無論是什麼合併,都不需要進行反序列化。

Shuffle Read

Shuffle Read 完全複用HashShuffleReader,具體參看 Sort-Based Shuffle。

總結

我個人感覺,Tungsten-sort 實現了記憶體的自主管理,管理方式模擬了作業系統的方式,通過Page可以使得大量的record被順序儲存在記憶體,整個shuffle write 排序的過程只需要對指標進行運算(二進位制排序),並且無需反序列化,整個過程非常高效,對於減少GC,提高記憶體訪問效率,提高CPU使用效率確實帶來了明顯的提升。