1. 程式人生 > >Spark(十二)--效能調優篇

Spark(十二)--效能調優篇

一段程式只能完成功能是沒有用的,只能能夠穩定、高效率地執行才是生成環境所需要的。

本篇記錄了Spark各個角度的調優技巧,以備不時之需。

一、配置引數的方式和觀察效能的方式

額。。。從最基本的開始講,可能一些剛接觸Spark的人不是很清楚Spark的一些引數變數到底要配置在哪裡。

可以通過三種方式配置引數,任選其一皆可。

  1. spark-env.sh檔案中配置:最近常使用的配置方式,格式可以參考其中的一些官方保留的配置。
  2. 程式中通過SparkConf配置:通過SparkConf物件set方法設定鍵值對,比較直觀。
  3. 程式中通過System.setProperty配置:和方法二差不多。

值得一提的是一個略顯詭異的現象,有些引數在spark-env.sh中配置並不起作用,反而要在程式中設定才有效果。

Spark的引數很多,一些預設的設定可以參考官網推薦的配置引數:/docs/latest/configuration.html

可以通過以下幾種方式來觀察Spark叢集的狀態和相關效能問題:

  1. Web UI:即8088埠進入的UI介面。
  2. Driver程式日誌:根據程式提交方式的不同到指定的節點上觀察Driver程式日誌。
  3. logs資料夾下的日誌:Spark叢集的大部分資訊都會記錄在這裡。
  4. works資料夾下的日誌:主要記錄Work節點的資訊。
  5. Profiler工具:沒有使用過。

前景交代完畢,下面進入正題:

二、排程與分割槽優化

1、小分割槽合併的問題

由於程式中過度使用filter運算元或者使用不當,都會造成大量的小分割槽出現。
因為每次過濾得到的結果只有原來資料集的一小部分,而這些量很小的資料同樣會以一定的分割槽數並行化分配到各個節點中執行。

帶來的問題就是:任務處理的資料量很小,反覆地切換任務所消耗的資源反而會帶來很大的系統開銷。

解決方案:使用重分割槽函式coalesce進行資料緊縮、減少分割槽數並設定shuffle=true保證任務是平行計算的

減少分割槽數,雖然意味著並行度降低,但是相對比之前的大量小任務過度切換的消耗,卻是比較值得的。

這裡也可以直接使用repartition重分割槽函式進行操作,因為其底層使用的是coalesce並設定Shuffle=true

2、資料傾斜問題

這是一個生產環境中經常遇到的問題,典型的場景是:大量的資料被分配到小部分節點計算,而其他大部分節點卻只計算小部分資料。

問題產生的原因有很多,可能且不全部包括:

  • key的資料分佈不均勻
  • 業務資料本身原因
  • 結構化表設計問題
  • 某些SQL語句會造成資料傾斜

可選的解決方案有:

  1. 增大任務數,減少分割槽數量:這種方法和解決小分割槽問題類似。
  2. 對特殊的key進行處理,如空值等:直接過濾掉空值的key以免對任務產生干擾。
  3. 使用廣播:小資料量直接廣播,大資料量先拆分之後再進行廣播。

還有一種場景是任務執行速度傾斜問題:叢集中其他節點都計算完畢了,但是隻有少數幾個節點死活執行不完。(其實這和上面的那個場景是差不多的)

解決方案:

  • 設定spark.speculation=true將執行事件過長的節點去掉,重新分配任務
  • spark.speculation.interval用來設定執行間隔

3、並行度調整

官方推薦每個CPU CORE分配2-3個任務。

  • 任務數太多:並行度太高,產生大量的任務啟動和切換開銷。
  • 任務數太低:並行度過低,無法發揮叢集平行計算能力,任務執行慢

Spark會根據檔案大小預設配置Map階段的任務數,所以我們能夠自行調整的就是Reduce階段的分割槽數了。

  • reduceByKey等操作時通過numPartitions引數進行分割槽數量配置。
  • 通過spark.default.parallelism進行預設分割槽數配置。

4、DAG排程執行優化

DAG圖是Spark計算的基本依賴,所以建議:

  1. 同一個Stage儘量容納更多地運算元,防止多餘的Shuffle。
  2. 複用已經cache的資料。

儘可能地在Transformation運算元中完成對資料的計算,因為過多的Action運算元會產生很多多餘的Shuffle,在劃分DAG圖時會形成眾多Stage。

三、網路傳輸優化

1、大任務分發問題

Spark採用Akka的Actor模型來進行訊息傳遞,包括資料、jar包和相關檔案等。

而Akka訊息通訊傳遞預設的容量最大為10M,一旦傳遞的訊息超過這個限制就會出現這樣的錯誤:

Worker任務失敗後Master上會列印“Lost TID:”

根據這個資訊找到對應的Worker節點後檢視SparkHome/work/目錄下的日誌,檢視Serialized size of result是否超過10M,就可以知道是不是Akka這邊的問題了。

一旦確認是Akka通訊容量限制之後,就可以通過配置spark.akka.frameSize控制Akka通訊訊息的最大容量。

2、Broadcast在調優場景的使用

Broadcast廣播,主要是用於共享Spark每個Task都會用到的一些只讀變數。

對於那些每個Task都會用到的變數來說,如果每個Task都為這些變數分配記憶體空間顯然會使用很多多餘的資源,使用廣播可以有效的避免這個問題,廣播之後,這些變數僅僅會在每臺機器上儲存一份,有Task需要使用時就到自己的機器上讀取就ok。

官方推薦,Task大於20k時可以使用,可以在控制檯上看Task的大小。

3、Collect結果過大的問題

大量資料時將資料儲存在HDFS上或者其他,不是大量資料,但是超出Akka傳輸的Buffer大小,通過配置spark.akka.frameSize調整。

四、序列化與壓縮

1、通過序列化手段優化

序列化之前說過,好處多多,所以是推薦能用就用,Spark上的序列化方式有幾種,具體的可以參考官方文件。

這裡只簡單介紹一下Kryo。

配置引數的時候使用spark.serializer=”org.apache.spark.serializer.KryoSerializer”配置

自定義定義可以被Kryo序列化的類的步驟:

  1. 自定義類extends KryoRegistrator
  2. 設定序列化方式conf.set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)
  3. conf.set(“spark.kyro.registrator”,”自定義的class”)
  4. 如果物件佔用空間大,需要增加Kryo的緩衝區則配置spark.kryoserializer.buffer.mb上值預設為2M

2、通過壓縮手段優化

Spark的Job大致可以分為兩種:

  • I/O密集型:即存在大量讀取磁碟的操作。
  • CPU密集型:即存在大量的資料計算,使用CPU資源較多。

對於I/O密集型的Job,能壓縮就壓縮,因為讀磁碟的時候資料壓縮了,佔用空間小了,讀取速度不就快了。

對於CPU密集型的Job,看具體CPU使用情況再做決定,因為使用壓縮是需要消耗一些CPU資源的,如果當前CPU已經超負荷了,再使用壓縮反而適得其反。

Spark支援兩種壓縮演算法:

  • LZF:高壓縮比
  • Snappy:高速度

一些壓縮相關的引數配置:

  1. spark.broadcast.compress:推薦為true
  2. spark.rdd.compress:預設為false,看情況配置,壓縮花費一些時間,但是可以節省大量記憶體空間
  3. spark.io.compression.codec:org.apache.spark.io.LZFCompressionCodec根據情況選擇壓縮演算法
  4. spark.io.compressions.snappy.block.size:設定Snappy壓縮的塊大小

五、其他優化方式

1、對外部資源的批處理操作

如操作資料庫時,每個分割槽的資料應該一起執行一次批處理,而不是一條資料寫一次,即map=>mapPartition。

2、reduce和reduceByKey

reduce:內部呼叫了runJob方法,是一個action操作。
reduceByKey:內部只是呼叫了combineBykey,是Transformation操作。

大量的資料操作時,reduce彙總所有資料到主節點會有效能瓶頸,將資料轉換為Key-Value的形式使用reduceByKey實現邏輯,會做類似mr程式中的Combiner的操作,Transformation操作分散式進行。

3、Shuffle操作符的記憶體使用

使用會觸發Shuffle過程的操作符時,操作的資料集合太大造成OOM,每個任務執行過程中會在各自的記憶體建立Hash表來進行資料分組。

可以解決的方案可能有:

  • 增加並行度即分割槽數可以適當解決問題
  • 可以將任務數量擴充套件到超過叢集整體的CPU core數