Spark Shuffle過程分析
阿新 • • 發佈:2019-01-25
MapReduce的sort-based shuffle
之前我們提到了MapReduce的Shuffle方式,Spark Shuffle雖然採取了和MapReduce完全不一樣的機制,但深層的原理還是有相同的地方的。所以,為了更好地理解Spark Shuffle的執行機制,我們先對MapReduce的Shuffle過程進行一個簡單的回顧:
- 首先是對輸入檔案進行細化,這個過程是將一個大的file檔案分成若干個小的block(預設大小為64MB),block會進一步對映成split塊,split和block的關係預設是一對一的,也可以通過設定改為一對多的關係。
- 每一個map對於處理一個split分塊,處理完成之後會進行分割槽操作,分割槽使用的是partition的介面,預設的partition是對key值進行hash後再和reduce task的數量取模。partition主要目的在於給後續的資料走向確定一個分割槽號,即決定了map的輸出結果給哪個後續reduce進行處理。
- map端的shuffle包括了collect,sort,spill,merge的過程。collect是將map的處理結果放入一個環形的記憶體緩衝區,緩衝區集存入k-v對也存入對應的索引資料,sort排序是對索引進行partition和key值的排序,以便後續的spill到磁碟中的檔案是按照partition進行排列,並內部key值有序的。當記憶體達到一定閾值就會通過spillthread執行緒進行spill操作,若干人為設定了combine,則需要對每個分割槽中的資料進行聚集操作。後面對這些大量的磁碟檔案進行多路歸併的merge,merge產生的檔案仍然是每個partition內部key有序的,這樣便最終產生了map端的輸出檔案。
- reduce端的shuffle首先需要通過http進行資料的拽取copy,在這個過程中會一邊copy一邊sort,對copy完的資料還會在進行merge,以減少最後給reduce的輸入資料量。
Spark的hash-based shuffle
區別於MapReduce的shuffle過程分的比較細緻(partition、collect、spill、sort、merge),並且按照步驟順序執行,Spark由於其處理的基本單元rdd是惰性的,所以也具備了懶執行的特點。
MapReduce的Shuffle採用的是sort-base型別的,而Spark的Shuffle採取的是hash-based型別。以下是Spark Shuffle的詳細過程:
- Spark的Shuffle總體而言就包含兩個基本的過程:Shuffle write和Shuffle read。ShuffleMapTask的整個執行過程就是Shuffle write。hash-based機制就是在Shuffle的過程中寫資料時不做排序操作,區別於MapReduce。只是將資料根據Hash的結果,將各個Reduce分割槽的資料寫到各自的磁碟檔案中。
- 首先是將map的輸出結果送到對於的緩衝區bucket裡面,分配bucket的過程同樣也是個hash進行分割槽的過程,在hashed-based下每一個bucket對應一個最終的reducer,在處理完之後bucket裡的資料會自動劃分到reducer的bucket裡面。每個bucket裡的檔案會被寫入本地磁碟檔案ShuffleBlockFile中,形成一個FileSegment檔案。
- Shuffle read指的是reducer對屬於自己的FileSegment檔案進行fetch操作,這裡採用的netty框架,效率明顯好於Mapreduce的http傳輸。fetch操作會等到所有的Shuffle Write過程結束後再進行,這也是因為ShuffleMapTask可能並不在一個stage裡面,需要在父stage執行之後提交才會進行子stage的執行。reducer通過fetch得到的FileSegment先放在緩衝區softBuffer中,預設大小48MB。Spark不要求Shuffle後的資料是全域性有序的,所以沒有必要等到shuffle read全部結束後再進行reduce,是可以並行處理的。