1. 程式人生 > 實用技巧 >spark 筆記2

spark 筆記2

一、Spark Shuffle 的發展

  • Spark 0.8及以前 Hash Based Shuffle
  • Spark 0.8.1 為Hash Based Shuffle引入File Consolidation機制
  • Spark 0.9 引入ExternalAppendOnlyMap
  • Spark 1.1 引入Sort Based Shuffle,但預設仍為Hash Based Shuffle
  • Spark 1.2 預設的Shuffle方式改為Sort Based Shuffle
  • Spark 1.4 引入Tungsten-Sort Based Shuffle
  • Spark 1.6 Tungsten-sort併入Sort Based Shuffle
  • Spark 2.0 Hash Based Shuffle退出歷史舞臺
Spark Shuffle 機制總共有三種:

1. 未優化的 HashShuffle
2. 優化後 HashShuffle (引入了 Consolidation 機制)
3. Sort-Based Shuffle

由於 HashShuffle 會產生很多的磁碟檔案,引入 Consolidation 機制雖然在一定程度少了磁碟檔案數量,但是不足以有效提高 Shuffle 的效能,適合中小型資料規模的大資料處理。

Spark 2.3中,唯一的支援方式為 SortShuffleManagerSortShuffleManager

中定義了 writer reader 對應shuffle map reduce 階段。reader 只有一種實現 BlockStoreShuffleReaderwriter 有三種執行實現:

  • BypassMergeSortShuffleWriter:當前 shuffle 沒有聚合,並且分割槽數小於 spark.shuffle.sort.bypassMergeThreshold(預設200
  • UnsafeShuffleWriter:當條件不滿足 BypassMergeSortShuffleWriter 時, 並且當前 rdd 的資料支援序列化(即 UnsafeRowSerializer
    ),也不需要聚合, 分割槽數小於 2^24
  • SortShuffleWriter:其餘所有shufle

特點:

BypassMergeSortShuffle

  1. 演算法適用於沒有聚合,資料量不大的場景, BypassMergeSortShuffleWriter 所有的中間資料都是在磁盤裡,並沒有利用記憶體。而且它只保證分割槽索引的排序,而並不保證資料的排序

  2.和Hash Shuffle中的HashShuffleWriter實現基本一致, 唯一的區別在於,map端的多個輸出檔案會被彙總為一個檔案。 所有分割槽的資料會合併為同一個檔案,會生成一個索引檔案,是為了索引到每個分割槽的起始地址,可以隨機 access 某個partition的所有資料

SortShuffleWriter

  1. 會有不同的資料結構:PartitionedAppendOnlyMap(需要內部聚合),PartitionedPairBuffer 不需要內部聚合

  2.處理步驟:   

1. 使用 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 在記憶體中進行排序,  排序的 K 是(partitionId, hash(key)) 這樣一個元組。

2. 如果超過記憶體 limit, 我 spill 到一個檔案中,這個檔案中元素也是有序的,首先是按照 partitionId的排序,如果 partitionId 相同, 再根據 hash(key)進行比較排序

3. 如果需要輸出全域性有序的檔案的時候,就需要對之前所有的輸出檔案 和 當前記憶體中的資料結構中的資料進行  merge sort, 進行全域性排序

UnsafeShuffleWriter

  1. 觸發條件:Serializer 支援 relocation,

  2. 沒有指定 aggregation 或者 key 排序,

  3.partition 數量不能大於指定的閾值(2^24),因為 partition number 使用24bit 表示的

  4. 特點:原始資料首先被序列化處理,並且再也不需要反序列,在其對應的元資料被排序後,需要Serializer支援relocation,在指定位置讀取對應資料

小結:

下圖是相關的uml圖

ShuffleHandle類 會儲存shuffle writer演算法需要的資訊。根據ShuffleHandle的型別,來選擇ShuffleWriter的型別。

ShuffleWriter負責在map端生成中間資料,ShuffleReader負責在reduce端讀取和整合中間資料。

ShuffleManager 提供了registerShuffle方法,根據shuffle的dependency情況,選擇出哪種ShuffleHandler。它對於不同的ShuffleHandler,有著不同的條件

  • BypassMergeSortShuffleHandle : 該shuffle不需要聚合,並且reduce端的分割槽數目小於配置項spark.shuffle.sort.bypassMergeThreshold,預設為200
  • SerializedShuffleHandle : 該shuffle不需要聚合,並且必須支援序列化時seek位置,還需要reduce端的分割槽數目小於16777216(1 << 24 + 1)
  • BaseShuffleHandle : 其餘情況

getWriter方法會根據registerShuffle方法返回的ShuffleHandler,選擇出哪種 shuffle writer,原理比較簡單:

  • 如果是BypassMergeSortShuffleHandle, 則選擇BypassMergeSortShuffleWriter

  • 如果是SerializedShuffleHandle, 則選擇UnsafeShuffleWriter

  • 如果是BaseShuffleHandle, 則選擇SortShuffleWriter

ShuffleWriter只有兩個方法,write和stop方法。使用者首先呼叫write方法,新增資料,完成排序,最後呼叫stop方法,返回MapStatus結果。下面依次介紹ShuffleWriter的三個子類。


Spark MapOutputTracker 原理

Spark的shuffle過程分為writer和reader兩塊。 writer負責生成中間資料,reader負責整合中間資料。而中間資料的元資訊,則由MapOutputTracker負責管理。 它負責writer和reader的溝通。

shuffle writer會將中間資料儲存到Block裡面,然後將資料的位置傳送給MapOutputTracker。

shuffle reader通過向 MapOutputTracker獲取中間資料的位置之後,才能讀取到資料。

參考引用:

https://zhmin.github.io/2019/01/26/spark-shuffle-writer/