spark 筆記2
一、Spark Shuffle 的發展
- Spark 0.8及以前 Hash Based Shuffle
- Spark 0.8.1 為Hash Based Shuffle引入File Consolidation機制
- Spark 0.9 引入ExternalAppendOnlyMap
- Spark 1.1 引入Sort Based Shuffle,但預設仍為Hash Based Shuffle
- Spark 1.2 預設的Shuffle方式改為Sort Based Shuffle
- Spark 1.4 引入Tungsten-Sort Based Shuffle
- Spark 1.6 Tungsten-sort併入Sort Based Shuffle
- Spark 2.0 Hash Based Shuffle退出歷史舞臺
1. 未優化的 HashShuffle 2. 優化後 HashShuffle (引入了 Consolidation 機制) 3. Sort-Based Shuffle
由於 HashShuffle 會產生很多的磁碟檔案,引入 Consolidation 機制雖然在一定程度少了磁碟檔案數量,但是不足以有效提高 Shuffle 的效能,適合中小型資料規模的大資料處理。
Spark 2.3中,唯一的支援方式為 SortShuffleManager,SortShuffleManager
- BypassMergeSortShuffleWriter:當前 shuffle 沒有聚合,並且分割槽數小於 spark.shuffle.sort.bypassMergeThreshold(預設200)
- UnsafeShuffleWriter:當條件不滿足 BypassMergeSortShuffleWriter 時, 並且當前 rdd 的資料支援序列化(即 UnsafeRowSerializer
- SortShuffleWriter:其餘所有shufle
特點:
BypassMergeSortShuffle
1. 演算法適用於沒有聚合,資料量不大的場景, BypassMergeSortShuffleWriter 所有的中間資料都是在磁盤裡,並沒有利用記憶體。而且它只保證分割槽索引的排序,而並不保證資料的排序
2.和Hash Shuffle中的HashShuffleWriter實現基本一致, 唯一的區別在於,map端的多個輸出檔案會被彙總為一個檔案。 所有分割槽的資料會合併為同一個檔案,會生成一個索引檔案,是為了索引到每個分割槽的起始地址,可以隨機 access 某個partition的所有資料
SortShuffleWriter
1. 會有不同的資料結構:PartitionedAppendOnlyMap(需要內部聚合),PartitionedPairBuffer 不需要內部聚合
2.處理步驟:
1. 使用 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 在記憶體中進行排序, 排序的 K 是(partitionId, hash(key)) 這樣一個元組。
2. 如果超過記憶體 limit, 我 spill 到一個檔案中,這個檔案中元素也是有序的,首先是按照 partitionId的排序,如果 partitionId 相同, 再根據 hash(key)進行比較排序
3. 如果需要輸出全域性有序的檔案的時候,就需要對之前所有的輸出檔案 和 當前記憶體中的資料結構中的資料進行 merge sort, 進行全域性排序
UnsafeShuffleWriter
1. 觸發條件:Serializer 支援 relocation,
2. 沒有指定 aggregation 或者 key 排序,
3.partition 數量不能大於指定的閾值(2^24),因為 partition number 使用24bit 表示的
4. 特點:原始資料首先被序列化處理,並且再也不需要反序列,在其對應的元資料被排序後,需要Serializer支援relocation,在指定位置讀取對應資料
小結:
下圖是相關的uml圖
ShuffleHandle類 會儲存shuffle writer演算法需要的資訊。根據ShuffleHandle的型別,來選擇ShuffleWriter的型別。
ShuffleWriter負責在map端生成中間資料,ShuffleReader負責在reduce端讀取和整合中間資料。
ShuffleManager 提供了registerShuffle方法,根據shuffle的dependency情況,選擇出哪種ShuffleHandler。它對於不同的ShuffleHandler,有著不同的條件
- BypassMergeSortShuffleHandle : 該shuffle不需要聚合,並且reduce端的分割槽數目小於配置項spark.shuffle.sort.bypassMergeThreshold,預設為200
- SerializedShuffleHandle : 該shuffle不需要聚合,並且必須支援序列化時seek位置,還需要reduce端的分割槽數目小於16777216(1 << 24 + 1)
- BaseShuffleHandle : 其餘情況
getWriter方法會根據registerShuffle方法返回的ShuffleHandler,選擇出哪種 shuffle writer,原理比較簡單:
-
如果是BypassMergeSortShuffleHandle, 則選擇BypassMergeSortShuffleWriter
-
如果是SerializedShuffleHandle, 則選擇UnsafeShuffleWriter
-
如果是BaseShuffleHandle, 則選擇SortShuffleWriter
ShuffleWriter只有兩個方法,write和stop方法。使用者首先呼叫write方法,新增資料,完成排序,最後呼叫stop方法,返回MapStatus結果。下面依次介紹ShuffleWriter的三個子類。
Spark MapOutputTracker 原理
Spark的shuffle過程分為writer和reader兩塊。 writer負責生成中間資料,reader負責整合中間資料。而中間資料的元資訊,則由MapOutputTracker負責管理。 它負責writer和reader的溝通。
shuffle writer會將中間資料儲存到Block裡面,然後將資料的位置傳送給MapOutputTracker。
shuffle reader通過向 MapOutputTracker獲取中間資料的位置之後,才能讀取到資料。
參考引用:
https://zhmin.github.io/2019/01/26/spark-shuffle-writer/