1. 程式人生 > >spark性能調優(二) 徹底解密spark的Hash Shuffle

spark性能調優(二) 徹底解密spark的Hash Shuffle

弱點 sta 出了 寫到 三方 很大的 完成 map 重新

裝載:http://www.cnblogs.com/jcchoiling/p/6431969.html

引言

Spark HashShuffle 是它以前的版本,現在1.6x 版本默應是 Sort-Based Shuffle,那為什麽要講 HashShuffle 呢,因為有分布式就一定會有 Shuffle,而且 HashShuffle 是 Spark以前的版本,亦即是 Sort-Based Shuffle 的前身,因為有 HashShuffle 的不足,才會有後續的 Sorted-Based Shuffle,以及現在的 Tungsten-Sort Shuffle,所以我們有必要去了解它。

人們對Spark的印象往往是基於內存進行計算,但實際上來講,Spark可以基於內存、也可以基於磁盤或者是第三方的儲存空間進行計算

,背後有兩層含意,第一、Spark框架的架構設計和設計模式上是傾向於在內存中計算數據的,第二、這也表達了人們對數據處理的一種美好的願望,就是希望計算數據的時候,數據就在內存中

為什麽再一次強調 Shuffle 是 Spark 的性能殺手啦,那不就是說,Spark中的 “Shuffle“ 和 “Spark完全是基於內存計算“ 的願景是相違背的!!!希望這篇文章能為讀者帶出以下的啟發:

  • 了解為什麽 Shuffle 是分布式系統的天敵
  • 了解 Spark HashShuffle的原理和機制
  • 了解優化後 Spark Consolidated HashShuffle的原理和機制
  • 了解Shuffle 是如何成為 Spark 性能殺手
  • 了解可以從那幾方面思考 Spark Shuffle 的性能調優
  • 了解 Spark HashShuffle 在讀、寫磁盤這個過程的源碼鑒賞

一、shuffle是分布式系統的天敵

Spark 運行分成兩部份,第一部份是 Driver Program,裏面的核心是 SparkContext,它驅動著一個程序的開始,負責指揮,另外一部份是 Worker 節點上的 Task,它是實際運行任務的,當程序運行時,不間斷地由 Driver 與所在的進程進行交互,交互什麽,有幾點,第一、是讓你去幹什麽,第二、是具體告訴 Task 數據在那裏,例如說有三個 Stage,第二個 Task 要拿數據,它就會向 Driver 要數據,所以在整個工作的過程中,Executor 中的 Task 會不斷地與 Driver 進行溝通,這是一個網絡傳輸的過程。

技術分享

在這個過程中一方面是 Driver 跟 Executor 進行網絡傳輸,另一方面是Task要從 Driver 抓取其他上遊的 Task 的數據結果,所以有這個過程中就不斷的產生網絡結果。其中,下一個 Stage 向上一個 Stage 要數據這個過程,我們就稱之為 Shuffle。

思考點:上一個 Stage 為什麽要向下一個 Stage 發數據?假設現在有一個程序,裏面有五個 Stage,我把它看成為一個很大的 Stage,在分布式系統中,數據分布在不同的節點上,每一個節點計算一部份數據,如果不對各個節點上獨立的部份進行匯聚的話,我們是計算不到最終的結果。這就是因為我們需要利用分布式來發揮它本身並行計算的能力,而後續又需要計算各節點上最終的結果,所以需要把數據匯聚集中,這就會導致 Shuffle,這也是說為什麽 Shuffle 是分布式不可避免的命運。

二、spark中的Hash Shuffle介紹

1、 原始的Hash Shuffle機制

基於 Mapper 和 Reducer 理解的基礎上,當 Reducer 去抓取數據時,它的 Key 到底是怎麽分配的,核心思考點是:作為上遊數據是怎麽去分配給下遊數據的。在這張圖中你可以看到有4個 Task 在2個 Executors 上面,它們是並行運行的,Hash 本身有一套 Hash算法,可以把數據的 Key 進行重新分類,每個 Task 對數據進行分類然後把它們不同類別的數據先寫到本地磁盤,然後再經過網絡傳輸 Shuffle,把數據傳到下一個 Stage 進行匯聚。

下圖有3個 Reducer,從 Task 開始那邊各自把自己進行 Hash 計算,分類出3個不同的類別,每個 Task 都分成3種類別的數據,剛剛提過因為分布式的關系,我們想把不同的數據匯聚然後計算出最終的結果,所以下遊的 Reducer 會在每個 Task 中把屬於自己類別的數據收集過來,匯聚成一個同類別的大集合,抓過來的時候會首先放在內存中,但內存可能放不下,也有可能放在本地 (這也是一個調優點。可以參考上一章講過的一些調優參數),每1個 Task 輸出3份本地文件,這裏有4個 Mapper Tasks,所以總共輸出了4個 Tasks x 3個分類文件 = 12個本地小文件

技術分享

Hash Shuffle 也有它的弱點:

  1. Shuffle前在磁盤上會產生海量的小文件,此時會產生大量耗時低效的 IO 操作 (因為產生過多的小文件)
  2. 內存不夠用,由於內存中需要保存海量文件操作句柄和臨時信息,如果數據處理的規模比較龐大的話,內存不可承受,會出現 OOM 等問題。

2、 優化後的Hash Shuffle 機制

在剛才 HashShuffle 的基礎上思考該如何進行優化,這是優化後的實現:

技術分享

這裏還是有4個Tasks,數據類別還是分成3種類型,因為Hash算法會根據你的 Key 進行分類,在同一個進程中,無論是有多少過Task,都會把同樣的Key放在同一個Buffer裏,然後把Buffer中的數據寫入以Core數量為單位的本地文件中,(一個Core只有一種類型的Key的數據)每1個Task所在的進程中,分別寫入共同進程中的3份本地文件,這裏有4個Mapper Tasks,所以總共輸出是 2個Cores x 3個分類文件 = 6個本地小文件。Consoldiated Hash-Shuffle的優化有一個很大的好處就是假設現在有200個Mapper Tasks在同一個進程中,也只會產生3個本地小文件; 如果用原始的 Hash-Based Shuffle 的話,200個Mapper Tasks 會各自產生3個本地小文件,在一個進程已經產生了600個本地小文件。3個對比600已經是一個很大的差異了。

這個優化後的 HashShuffle 叫 ConsolidatedShuffle,在實際生產環境下可以調以下參數:

spark.shuffle.consolidateFiles=true

 Consolidated HashShuffle 也有它的弱點:

  1. 如果 Reducer 端的並行任務或者是數據分片過多的話則 Core * Reducer Task 依舊過大,也會產生很多小文件。

3、Shuffle是如何成為Spark性能殺手及調優點思考

Shuffle 不可以避免是因為在分布式系統中的基本點就是把一個很大的的任務/作業分成一百份或者是一千份,這一百份和一千份文件在不同的機器上獨自完成各自不同的部份,我們是針對整個作業要結果,所以在後面會進行匯聚,這個匯聚的過程的前一階段到後一階段以至網絡傳輸的過程就叫 Shuffle。在 Spark 中為了完成 Shuffle 的過程會把真正的一個作業劃分為不同的 Stage,這個Stage 的劃分是跟據依賴關系去決定的,Shuffle 是整個 Spark 中最消耗性能的一個地方。試試想想如果沒有 Shuffle 的話,Spark可以完成一個純內存式的操作。

reduceByKey,它會把每個 Key 對應的 Value 聚合成一個 value 然後生成新的 RDD

  

Shuffle 是如何破壞了純內存操作呢,因為在不同節點上我們要進行數據傳輸,數據在通過網絡發送之前,要先存儲在內存中,內存達到一定的程度,它會寫到本地磁盤,(在以前 Spark 的版本它沒有Buffer 的限制,會不斷地寫入 Buffer 然後等內存滿了就寫入本地,現在的版本對 Buffer 多少設定了限制,以防止出現 OOM,減少了 IO)

Mapper 端會寫入內存 Buffer,這個便關乎到 GC 的問題,然後 Mapper端的 Block 要寫入本地,大量的磁盤與IO的操作和磁盤與網絡IO的操作,這就構成了分布式的性能殺手。

如果要對最終計算結果進行排序的話,一般會都會進行 sortByKey,如果以最終結果來思考的話,你可以認為是產生了一個很大很大的 partition,你可以用 reduceByKey 的時候指定它的並行度,例如你把 reduceByKey 的並行度變成為1,新 RDD 的數據切片就變成1,排序一般都會在很多節點上,如果你把很多節點變成一個節點然後進行排序,有時候會取得更好的效果,因為數據就在一個節點上,技術層面來講就只需要在一個進程裏進行排序。

可以在調用 reduceByKey()接著調用 mapPartition( );
也可以用 repartitionAndSortWithPartitions( ); 

  還有一個很危險的地方就是數據傾斜,在我們談的 Shuffle 機制中,不斷強調不同機器從Mapper端抓取數據並計算結果,但有沒有意會到數據可能會分布不均衡,什麽時候會導致數據傾斜,答案就是 Shuffle 時會導政數據分布不均衡,也就是數據傾斜的問題。數據傾斜的問題會引申很多其他問題,比如,網絡帶寬、各重硬件故障、內存過度消耗、文件掉失。因為 Shuffle 的過程中會產生大量的磁盤 IO、網絡 IO、以及壓縮、解壓縮、序列化和反序列化等等

4、Shuffle 性能調優思考

Shuffle可能面臨的問題,運行 Task 的時候才會產生 Shuffle (Shuffle 已經融化在 Spark 的算子中)

  1. 幾千臺或者是上萬臺的機器進行匯聚計算,數據量會非常大,網絡傳輸會很大
  2. 數據如何分類其實就是 partition,即如何 Partition、Hash 、Sort 、計算
  3. 負載均衡 (數據傾斜)
  4. 網絡傳輸效率,需要壓縮或解壓縮之間做出權衡,序列化 和 反序列化也是要考慮的問題

具體的 Task 進行計算的時候盡一切最大可能使得數據具備 Process Locality 的特性,退而求其次是增加數據分片,減少每個 Task 處理的數據量,基於Shuffle 和數據傾斜所導致的一系列問題,可以延伸出很多不同的調優點,比如說:

  • Mapper端的 Buffer 應該設置為多大呢?
  • Reducer端的 Buffer 應該設置為多大呢?如果 Reducer 太少的話,這會限制了抓取多少數據
  • 在數據傳輸的過程中是否有壓縮以及該用什麽方式去壓縮,默應是用 snappy 的壓縮方式。
  • 網絡傳輸失敗重試的次數,每次重試之間間隔多少時間。

spark性能調優(二) 徹底解密spark的Hash Shuffle