1. 程式人生 > >Spark shuffle調優

Spark shuffle調優

ces 傳輸 shuff res spark性能 模式 shuffle過程 圖片 連接

Spark shuffle是什麽
Shuffle在Spark中即是把父RDD中的KV對按照Key重新分區,從而得到一個新的RDD。也就是說原本同屬於父RDD同一個分區的數據需要進入到子RDD的不同的分區。

現在的spark版本默認使用的是sortshuffle;
shuffle在哪裏產生

shuffle在spark的算子中產生,也就是運行task的時候才會產生shuffle.

sortShuffleManagerspark

shuffle的默認計算引擎叫sortshuffleManager,它負責shuffle過程的執行、計算和組件的處理,sortshuffleManager會將task進行shuffle操作時產生的臨時磁盤文件合並成一個磁盤文件,在下一個stage的shuffle read task拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可。

sortshuffle的內部機制
技術分享圖片

  1. 數據會根據不同的shuffle算子存儲到map數據結構(如reduceByKey)或者array數據結構(join);不過Map是一邊聚合,一邊寫入內存,array是直接寫入內存. 當內存達到一個閾值,就會溢出寫到磁盤,因此在溢出這個環節會在磁盤上產生多個臨時文件,磁盤上的這些文件需要合並,於是spark就有了merge機制.
  2. 在溢寫到磁盤之前,在內存中會按照key來排序,排序過後會進入到一個buffer緩沖區,默認為32K,緩沖區的batch默認為1萬條key,也就是緩沖區以每次一萬條的量寫入到磁盤文件中,該緩沖區減少IO,提高性能. 緩沖區和寫入磁盤使用的技術是java中的BufferedOutputStream.

  3. merge會將之前產生的所有的臨時文件進行合並,包括緩沖區讀寫到磁盤上的文件,合並成一個大的文件到磁盤,默認為48M,與這個文件相對於的還有一個索引文件,索引文件裏面記錄的是這個文件的元信息,且這個磁盤文件也是下遊stage的Task的輸入信息! ?? 註: 一個下遊的task對應一個磁盤文件和這個磁盤文件的元信息. 於是就有了血統,繼承之類的!

shuffle當中可能會遇到的問題

  1. 數據量非常大,從其他各臺機器收集數據占用大量網絡。
  2. 數據如何分類,即如何Partition,Hash、Sort等;
  3. 負載均衡(數據傾斜),因為采用不同的Shuffle方式對數據不同的分類,而分類之後又要跑到具體的節點上計算,如果不恰當的話,很容易產生數據傾斜;
  4. 網絡傳輸效率,需要在壓縮和解壓縮之間做出權衡,序列化和反序列也是要考慮的問題;
    說明:具體的Task進行計算的時候盡一切最大可能使得數據具備Process Locality的特性;退而求次是增加數據分片,減少每個Task處理的數據量。

shuffle調優
shuffle調優分為兩種,一種是shuffle參數根據實際情況調優,一種是代碼開發調優,代碼開發調優我在spark性能調優裏面去寫!

  1. spark.shuffle.file.buffer(默認值為32K,每次出貨1萬條)該參數是緩沖區的緩沖內存,如果可用的內存資源較為充足的話,可以將緩沖區的值設置大點,這樣會較少磁盤IO次數.,如果合理調節該參數,性能會提升1%~5%...? 可以設置為64K.
  2. spark.reducer.maxSizeInFlight(默認為48M)該參數是stage的每一個task就需要將上一個stage的計算結果中的所有相同key,從各個節點上通過網絡都拉取到自己所在的節點上,然後進行key的聚合或連接等操作,如果合理調節該參數(增大),性能會提升1%~5%...
  3. spark.shuffle.io.maxRetries(默認3次)該參數是stage的task向上一個stage的task計算結果拉取數據,也就是上面那個操作,有時候會因為網絡異常原因,導致拉取失敗,失敗時候默認重新拉取三次,三次過還是失敗的話作業就執行失敗了,根據具體的業務可以考慮將默認值增大,這樣可以避免由於JVM的一些原因或者網絡不穩定等因素導致的數據拉取失敗.也有助於提高spark作業的穩定性. 可以適當的提升重新拉取的次數,最大為60次.
  4. spark.shuffle.io.retryWait(默認為5s)該參數和上面一樣,是每次拉取數據的間隔時間...? 調優建議:建議加大間隔時長(比如20s),以增加shuffle操作的穩定性
  5. spark.shuffle.memoryFraction(默認0.2,也就是20%)該參數是數據根據不同的shuffle算子將數據寫入內存結構中,內存結構達到閾值會溢出臨時文件,這個參數就是則是內存結構的閾值百分比的,不是內存結構的內存大小.? 如果內存充足,而且很少使用持久化操作,建議調高這個比例,可以減少頻繁對磁盤進行IO操作,合理調節該參數可以將性能提升10%左右.
  6. spark.shuffle.manager(默認sort)該參數是設置shuffle的類型,默認是sort,也就是sortshuffleManager, hash參數對應HashShuffleManager, tungsten-sort參數對應tungsten(這個很少用),HashShuffleManager是以前的版本,這個默認就行,
  7. spark.shuffle.sort.bypassMergeThreshold(默認200個)該參數是如果shuffle read task的數量小於等於200個的時候,在sortshufflemanager模式下,就會啟動ByPass sortshufflemanager...這個調優就這樣把 ,默認200挺好的.
  8. spark.shuffle.consolidateFiles(默認為false)該參數只對HashshuffleManager有效,而HashshuffleManager是spark1.2之前默認使用的版本...

Spark shuffle調優