圖解Spark Shuffle的發展歷程
一、Spark Hash Shuffle
基於Hash的Shuffle Write操作較為簡單,這種Shuffle方式中,Shuffle Map Task會根據下游生成的Partition個數來建立中間檔案來儲存對應的Partition資料。如下圖所示,下游生成3個Partition,此時每個Shuffle Map Task會生成3箇中間檔案來儲存3個Partition中的資料。如一個Shuffle Map Task在處理資料<1, 1>, <2, 1>, <3, 1>時,會分別將<1, 1>, <2, 2>, <3,3>寫入到Partition 1(1 % 3 = 1), Partition 2(2 % 3 = 2)和Partition 0(3 % 3 = 0)對應的檔案中。
基於Hash的Shuffle Write操作雖然較為簡單,但是帶來的問題是會建立大量的中間檔案,假設Shuffle Map Task個數記為 M, 下游生成的Partition個數為 P,此時生成的中間檔案個數為 M * P。而對於這種Shuffle中間檔案比較小的情況,多次的磁碟IO開銷是不容忽視的。
二、Spark Consolidate Shuffle
Spark Consolidate Shuffle是對Spark Hash Shuffle的優化,相比較於Spark Hash Shuffle中的每個Shuffle Map Task都會生成多個Shuffle中間檔案的問題,Spark Consolidate Shuffle採用同一個Core上執行的Shuffle Map Task輸出的中間資料放在相同的中間檔案中。具體做法如下圖所示:Core 1上先執行一個Shuffle Map Task,然後改Shuffle Map Task會建立3個Shuffle中間檔案並完成Shuffle中間資料的寫入(此步驟和Spark Hash Shuffle一樣)。而對於Core 1上接下來執行的Shuffle Map Task則不會繼續建立Shuffle中間檔案,此時後繼的Shuffle Map Task會在已建立的Shuffle中間檔案中追加資料。
Spark Consolidate Shuffle將同一個Core上的Shuffle Map Task輸出的中間資料儲存到相同的Shuffle中間檔案中,有效的減少了中間的個數。假設參與Shuffle運算的核數記為 C,下游生成的Partition個數記為 P,則Shuffle過程生成的中間檔案個數為 C * P。這種方式雖然在一定程度上減少了Shuffle中間檔案的個數,但是當並行度較大時,仍然會帶來大量的磁碟IO。
三、 Spark Sort Based Shuffle
為了解決Shuffle過程中生成大量Shuffle中間檔案問題,Spark採用了Sort Based Shuffle。這種方式下,上游的每個Shuffle Map Task只會生成一個Shuffle中間檔案和一個Index索引檔案。Shuffle Map Task在處理資料時,藉助一個AppendOnlyMap資料結構來儲存中間資料,當達到閾值時,會將AppendOnlyMap中的資料溢寫到磁碟上生成一個SpillMap檔案,當Shuffle Map Task完成時,則會將生成的SpillMap檔案和記憶體中剩餘的資料進行合併成一個檔案。在此過程實際上會涉及到排序,但是Spark為了避免不必要的排序,只會在需要進行聚集運算時才會對資料進行排序,否則只會按照Partition ID進行排序。
Spar Sort Based Shuffle有效地解決了Shuffle過程中生成大量中間檔案的問題,假設Shuffle Map Task的個數為 M,則最終生成的中間檔案個數為 2 * M。