spark.shuffle調優
1.1.1 spark.shuffle.manager
Spark1.2.0官方支援兩種方式的Shuffle,即Hash Based Shuffle和Sort Based Shuffle。其中在Spark 1.0之前僅支援Hash Based Shuffle。Spark 1.1的時候引入了Sort Based Shuffle。Spark 1.2的預設Shuffle機制從Hash變成了Sort。如果需要Hash Based Shuffle,可以將spark.shuffle.manager設定成“hash”即可。
如果對效能有比較苛刻的要求,那麼就要理解這兩種不同的Shuffle機制的原理,結合具體的應用場景進行選擇。
Hash Based Shuffle,就是將資料根據Hash的結果,將各個Reducer partition的資料寫到單獨的檔案中去,寫資料時不會有排序的操作。這個問題就是如果Reducer的partition比較多的時候,會產生大量的磁碟檔案。這會帶來兩個問題:
1) 同時開啟的檔案比較多,那麼大量的檔案控制代碼和寫操作分配的臨時記憶體會非常大,對於記憶體的使用和GC帶來很多的壓力。尤其是在Sparkon YARN的模式下,Executor分配的記憶體普遍比較小的時候,這個問題會更嚴重。
2) 從整體來看,這些檔案帶來大量的隨機讀,讀效能可能會遇到瓶頸。
Sort Based Shuffle會根據實際情況對資料採用不同的方式進行Sort。這個排序可能僅僅是按照Reducer的partition進行排序,保證同一個Shuffle Map Task的對應於不同的Reducer的partition的資料都可以寫到同一個資料檔案,通過一個Offset來標記不同的Reducer partition的分界。因此一個Shuffle Map Task僅僅會生成一個數據檔案(還有一個index索引檔案),從而避免了Hash Based Shuffle檔案數量過多的問題。
選擇Hash還是Sort,取決於記憶體,排序和檔案操作等因素的綜合影響。
對於不需要進行排序的Shuffle而且Shuffle產生的檔案數量不是特別多,Hash Based Shuffle可能是個更好的選擇;畢竟Sort Based Shuffle至少會按照Reducer的partition進行排序。
而Sort BasedShuffle的優勢就在於Scalability,它的出現實際上很大程度上是解決Hash Based Shuffle的Scalability的問題。由於Sort Based Shuffle還在不斷的演進中,因此Sort Based Shuffle的效能會得到不斷的改善。
對選擇那種Shuffle,如果對於效能要求苛刻,最好還是通過實際的場景中測試後再決定。不過選擇預設的Sort,可以滿足大部分的場景需要。
1.1.2 spark.shuffle.spill
這個引數的預設值是true,用於指定Shuffle過程中如果記憶體中的資料超過閾值(參考spark.shuffle.memoryFraction的設定),那麼是否需要將部分資料臨時寫入外部儲存。如果設定為false,那麼這個過程就會一直使用記憶體,會有Out Of Memory的風險。因此只有在確定記憶體足夠使用時,才可以將這個選項設定為false。
對於Hash BasedShuffle的Shuffle Write過程中使用的org.apache.spark.util.collection.AppendOnlyMap就是全記憶體的方式,而org.apache.spark.util.collection.ExternalAppendOnlyMap對org.apache.spark.util.collection.AppendOnlyMap有了進一步的封裝,在記憶體使用超過閾值時會將它spill到外部儲存,在最後的時候會對這些臨時檔案進行Merge。
而Sort BasedShuffle Write使用到的org.apache.spark.util.collection.ExternalSorter也會有類似的spill。
而對於ShuffleRead,如果需要做aggregate,也可能在aggregate的過程中將資料spill的外部儲存。
1.1.3 spark.shuffle.memoryFraction和spark.shuffle.safetyFraction
在啟用spark.shuffle.spill的情況下,spark.shuffle.memoryFraction決定了當Shuffle過程中使用的記憶體達到總記憶體多少比例的時候開始Spill。在Spark 1.2.0裡,這個值是0.2。通過這個引數可以設定Shuffle過程佔用記憶體的大小,它直接影響了Spill的頻率和GC。
如果Spill的頻率太高,那麼可以適當的增加spark.shuffle.memoryFraction來增加Shuffle過程的可用記憶體數,進而減少Spill的頻率。當然為了避免OOM(記憶體溢位),可能就需要減少RDD cache所用的記憶體,即需要減少spark.storage.memoryFraction的值;但是減少RDD cache所用的記憶體有可能會帶來其他的影響,因此需要綜合考量。
在Shuffle過程中,Shuffle佔用的記憶體數是估計出來的,並不是每次新增的資料項都會計算一次佔用的記憶體大小,這樣做是為了降低時間開銷。但是估計也會有誤差,因此存在實際使用的記憶體數比估算值要大的情況,因此引數 spark.shuffle.safetyFraction作為一個保險係數降低實際Shuffle過程所需要的記憶體值,降低實際記憶體超出使用者配置值的風險。
1.1.4 spark.shuffle.sort.bypassMergeThreshold
這個配置的預設值是200,用於設定在Reducer的partition數目少於多少的時候,Sort Based Shuffle內部不使用Merge Sort的方式處理資料,而是直接將每個partition寫入單獨的檔案。這個方式和Hash Based的方式是類似的,區別就是在最後這些檔案還是會合併成一個單獨的檔案,並通過一個index索引檔案來標記不同partition的位置資訊。從Reducer看來,資料檔案和索引檔案的格式和內部是否做過Merge Sort是完全相同的。
這個可以看做SortBased Shuffle在Shuffle量比較小的時候對於Hash Based Shuffle的一種折衷。當然了它和Hash Based Shuffle一樣,也存在同時開啟檔案過多導致記憶體佔用增加的問題。因此如果GC比較嚴重或者記憶體比較緊張,可以適當的降低這個值。
1.1.5 spark.shuffle.blockTransferService
在Spark 1.2.0,這個配置的預設值是netty,而之前是nio。這個主要是用於在各個Executor之間傳輸Shuffle資料。Netty的實現更加簡潔,但實際上使用者不用太關心這個選項。除非是有特殊的需求,否則採用預設配置就可以。
1.1.6 spark.shuffle.consolidateFiles
這個配置的預設配置是false。主要是為了解決在Hash Based Shuffle過程中產生過多檔案的問題。如果配置選項為true,那麼對於同一個Core上執行的Shuffle Map Task不會新產生一個Shuffle檔案而是重用原來的。但是每個Shuffle Map Task還是需要產生下游Task數量的檔案,因此它並沒有減少同時開啟檔案的數量。如果需要了解更加詳細的細節,可以閱讀7.1節。
但是consolidateFiles的機制在Spark 0.8.1就引入了,到Spark 1.2.0還是沒有穩定下來。從原始碼實現的角度看,實現原始碼是非常簡單的,但是由於涉及本地的檔案系統等限制,這個策略可能會帶來各種各樣的問題。由於它並沒有減少同時開啟檔案的數量,因此不能減少由檔案控制代碼帶來的記憶體消耗。如果面臨Shuffle的檔案數量非常大,那麼是否開啟這個選項最好還是通過實際測試後再決定。
1.1.7 spark.shuffle.service.enabled
(false)
1.1.8 spark.shuffle.compress和 spark.shuffle.spill.compress
這兩個引數的預設配置都是true。spark.shuffle.compress和spark.shuffle.spill.compress都是用來設定Shuffle過程中是否對Shuffle資料進行壓縮;其中前者針對最終寫入本地檔案系統的輸出檔案,後者針對在處理過程需要spill到外部儲存的中間資料,後者針對最終的shuffle輸出檔案。
如何設定spark.shuffle.compress?
如果下游的Task通過網路獲取上游Shuffle Map Task的結果的網路IO成為瓶頸,那麼就需要考慮將它設定為true:通過壓縮資料來減少網路IO。由於上游Shuffle Map Task和下游的Task現階段是不會並行處理的,即上游Shuffle Map Task處理完成,然後下游的Task才會開始執行。因此如果需要壓縮的時間消耗就是Shuffle MapTask壓縮資料的時間 + 網路傳輸的時間 + 下游Task解壓的時間;而不需要壓縮的時間消耗僅僅是網路傳輸的時間。因此需要評估壓縮解壓時間帶來的時間消耗和因為資料壓縮帶來的時間節省。如果網路成為瓶頸,比如叢集普遍使用的是千兆網路,那麼可能將這個選項設定為true是合理的;如果計算是CPU密集型的,那麼可能將這個選項設定為false才更好。
如何設定spark.shuffle.spill.compress?
如果設定為true,代表處理的中間結果在spill到本地硬碟時都會進行壓縮,在將中間結果取回進行merge的時候,要進行解壓。因此要綜合考慮CPU由於引入壓縮解壓的消耗時間和Disk IO因為壓縮帶來的節省時間的比較。在Disk IO成為瓶頸的場景下,這個被設定為true可能比較合適;如果本地硬碟是SSD,那麼這個設定為false可能比較合適。
1.1.9 spark.reducer.maxMbInFlight
這個引數用於限制一個ReducerTask向其他的Executor請求Shuffle資料時所佔用的最大記憶體數,尤其是如果網絡卡是千兆和千兆以下的網絡卡時。預設值是48MB。設定這個值需要中和考慮網絡卡頻寬和記憶體。