Spark效能調優-RDD運算元調優篇(深度好文,面試常問,建議收藏)
阿新 • • 發佈:2021-03-05
## RDD運算元調優
不廢話,直接進入正題!
#### 1. RDD複用
在對RDD進行運算元時,要避免相同的運算元和計算邏輯之下對RDD進行重複的計算,如下圖所示:
![RDD的重複計算](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210228_1.png)
對上圖中的RDD計算架構進行修改,得到如下圖所示的優化結果:
![RDD架構優化](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210228_2.png)
#### 2. 儘早filter
獲取到初始RDD後,應該考慮**儘早地過濾掉不需要的資料**,進而減少對記憶體的佔用,從而提升Spark作業的執行效率。
> 本文首發於公眾號:五分鐘學大資料,歡迎圍觀
#### 3. 讀取大量小檔案-用wholeTextFiles
當我們將一個文字檔案讀取為 RDD 時,輸入的每一行都會成為RDD的一個元素。
也可以將多個完整的文字檔案一次性讀取為一個pairRDD,其中鍵是檔名,值是檔案內容。
```java
val input:RDD[String] = sc.textFile("dir/*.log")
```
如果傳遞目錄,則將目錄下的所有檔案讀取作為RDD。檔案路徑支援萬用字元。
但是這樣對於大量的小檔案讀取效率並不高,應該使用 **wholeTextFiles**
返回值為RDD[(String, String)],其中Key是檔案的名稱,Value是檔案的內容。
```java
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])
```
***
wholeTextFiles讀取小檔案:
```java
val filesRDD: RDD[(String, String)] =
sc.wholeTextFiles("D:\\data\\files", minPartitions = 3)
val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n"))
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)
```
#### 4. mapPartition和foreachPartition
- **mapPartitions**
map(_....) 表示每一個元素
mapPartitions(_....) 表示每個分割槽的資料組成的迭代器
普通的map運算元對RDD中的每一個元素進行操作,而mapPartitions運算元對RDD中每一個分割槽進行操作。
如果是普通的map運算元,假設一個partition有1萬條資料,那麼map運算元中的function要執行1萬次,也就是對每個元素進行操作。
![map 運算元](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210228_3.png)
如果是mapPartition運算元,由於一個task處理一個RDD的partition,那麼一個task只會執行一次function,function一次接收所有的partition資料,效率比較高。
![mapPartition 運算元](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210228_4.png)
比如,**當要把RDD中的所有資料通過JDBC寫入資料,如果使用map運算元,那麼需要對RDD中的每一個元素都建立一個數據庫連線,這樣對資源的消耗很大,如果使用mapPartitions運算元,那麼針對一個分割槽的資料,只需要建立一個數據庫連線**。
mapPartitions運算元也存在一些缺點:對於普通的map操作,一次處理一條資料,如果在處理了2000條資料後記憶體不足,那麼可以將已經處理完的2000條資料從記憶體中垃圾回收掉;但是如果使用mapPartitions運算元,但資料量非常大時,function一次處理一個分割槽的資料,如果一旦記憶體不足,此時無法回收記憶體,就可能會OOM,即記憶體溢位。
因此,**mapPartitions運算元適用於資料量不是特別大的時候,此時使用mapPartitions運算元對效能的提升效果還是不錯的**。(當資料量很大的時候,一旦使用mapPartitions運算元,就會直接OOM)
在專案中,應該首先估算一下RDD的資料量、每個partition的資料量,以及分配給每個Executor的記憶體資源,如果資源允許,可以考慮使用mapPartitions運算元代替map。
- **foreachPartition**
rrd.foreache(_....) 表示每一個元素
rrd.forPartitions(_....) 表示每個分割槽的資料組成的迭代器
在生產環境中,通常使用foreachPartition運算元來完成資料庫的寫入,通過foreachPartition運算元的特性,可以優化寫資料庫的效能。
如果使用foreach運算元完成資料庫的操作,由於foreach運算元是遍歷RDD的每條資料,因此,每條資料都會建立一個數據庫連線,這是對資源的極大浪費,因此,**對於寫資料庫操作,我們應當使用foreachPartition運算元**。
與mapPartitions運算元非常相似,foreachPartition是將RDD的每個分割槽作為遍歷物件,一次處理一個分割槽的資料,也就是說,如果涉及資料庫的相關操作,一個分割槽的資料只需要建立一次資料庫連線,如下圖所示:
![foreachPartition 運算元](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210228_5.png)
使用了foreachPartition 運算元後,可以獲得以下的效能提升:
1. 對於我們寫的function函式,一次處理一整個分割槽的資料;
2. 對於一個分割槽內的資料,建立唯一的資料庫連線;
3. 只需要向資料庫傳送一次SQL語句和多組引數;
**在生產環境中,全部都會使用foreachPartition運算元完成資料庫操作。foreachPartition運算元存在一個問題,與mapPartitions運算元類似,如果一個分割槽的資料量特別大,可能會造成OOM,即記憶體溢位**。
#### 5. filter+coalesce/repartition(減少分割槽)
在Spark任務中我們經常會使用filter運算元完成RDD中資料的過濾,在任務初始階段,從各個分割槽中載入到的資料量是相近的,但是一旦進過filter過濾後,每個分割槽的資料量有可能會存在較大差異,如下圖所示:
![分割槽資料過濾結果](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210228_6.png)
根據上圖我們可以發現兩個問題:
1. 每個partition的資料量變小了,如果還按照之前與partition相等的task個數去處理當前資料,有點浪費task的計算資源;
2. 每個partition的資料量不一樣,會導致後面的每個task處理每個partition資料的時候,每個task要處理的資料量不同,這很有可能導致資料傾斜問題。
如上圖所示,第二個分割槽的資料過濾後只剩100條,而第三個分割槽的資料過濾後剩下800條,在相同的處理邏輯下,第二個分割槽對應的task處理的資料量與第三個分割槽對應的task處理的資料量差距達到了8倍,這也會導致執行速度可能存在數倍的差距,這也就是**資料傾斜問題**。
針對上述的兩個問題,我們分別進行分析:
1. 針對第一個問題,既然分割槽的資料量變小了,我們希望可以對分割槽資料進行重新分配,比如將原來4個分割槽的資料轉化到2個分割槽中,這樣只需要用後面的兩個task進行處理即可,避免了資源的浪費。
2. 針對第二個問題,解決方法和第一個問題的解決方法非常相似,對分割槽資料重新分配,讓每個partition中的資料量差不多,這就避免了資料傾斜問題。
那麼具體應該如何實現上面的解決思路?我們需要coalesce運算元。
repartition與coalesce都可以用來進行重分割槽,其中repartition只是coalesce介面中shuffle為true的簡易實現,coalesce預設情況下不進行shuffle,但是可以通過引數進行設定。
假設我們希望將原本的分割槽個數A通過重新分割槽變為B,那麼有以下幾種情況:
1. A > B(多數分割槽合併為少數分割槽)
- A與B相差值不大
此時使用coalesce即可,無需shuffle過程。
- A與B相差值很大
此時可以使用coalesce並且不啟用shuffle過程,但是會導致合併過程效能低下,所以推薦設定coalesce的第二個引數為true,即啟動shuffle過程。
2. A < B(少數分割槽分解為多數分割槽)
此時使用repartition即可,如果使用coalesce需要將shuffle設定為true,否則coalesce無效。
**我們可以在filter操作之後,使用coalesce運算元針對每個partition的資料量各不相同的情況,壓縮partition的數量,而且讓每個partition的資料量儘量均勻緊湊,以便於後面的task進行計算操作,在某種程度上能夠在一定程度上提升效能**。
注意:local模式是程序內模擬叢集執行,已經對並行度和分割槽數量有了一定的內部優化,因此不用去設定並行度和分割槽數量。
#### 6. 並行度設定
**Spark作業中的並行度指各個stage的task的數量**。
如果並行度設定不合理而導致並行度過低,會導致資源的極大浪費,例如,20個Executor,每個Executor分配3個CPU core,而Spark作業有40個task,這樣每個Executor分配到的task個數是2個,這就使得每個Executor有一個CPU core空閒,導致資源的浪費。
理想的並行度設定,應該是讓並行度與資源相匹配,簡單來說就是在資源允許的前提下,並行度要設定的儘可能大,達到可以充分利用叢集資源。合理的設定並行度,可以提升整個Spark作業的效能和執行速度。
Spark官方推薦,**task數量應該設定為Spark作業總CPU core數量的2~3倍**。之所以沒有推薦task數量與CPU core總數相等,是因為task的執行時間不同,有的task執行速度快而有的task執行速度慢,如果task數量與CPU core總數相等,那麼執行快的task執行完成後,會出現CPU core空閒的情況。如果task數量設定為CPU core總數的2~3倍,那麼一個task執行完畢後,CPU core會立刻執行下一個task,降低了資源的浪費,同時提升了Spark作業執行的效率。
Spark作業並行度的設定如下:
```java
val conf = new SparkConf().set("spark.default.parallelism", "500")
```
原則:**讓 cpu 的 Core(cpu 核心數) 充分利用起來,
如有100個 Core,那麼並行度可以設定為200~300**。
#### 7. repartition/coalesce調節並行度
我們知道 Spark 中有並行度的調節策略,但是,**並行度的設定對於Spark SQL是不生效的,使用者設定的並行度只對於Spark SQL以外的所有Spark的stage生效**。
Spark SQL的並行度不允許使用者自己指定,Spark SQL自己會預設根據hive表對應的HDFS檔案的split個數自動設定Spark SQL所在的那個stage的並行度,使用者自己通 **spark.default.parallelism** 引數指定的並行度,只會在沒Spark SQL的stage中生效。
由於Spark SQL所在stage的並行度無法手動設定,如果資料量較大,並且此stage中後續的transformation操作有著複雜的業務邏輯,而Spark SQL自動設定的task數量很少,這就意味著每個task要處理為數不少的資料量,然後還要執行非常複雜的處理邏輯,這就可能表現為第一個有Spark SQL的stage速度很慢,而後續的沒有Spark SQL的stage執行速度非常快。
為了解決Spark SQL無法設定並行度和task數量的問題,我們可以使用repartition運算元。
repartition 運算元使用前後對比圖如下:
![repartition 運算元使用前後對比圖](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210228_7.png)
**Spark SQL這一步的並行度和task數量肯定是沒有辦法去改變了,但是,對於Spark SQL查詢出來的RDD,立即使用repartition運算元,去重新進行分割槽,這樣可以重新分割槽為多個partition,從repartition之後的RDD操作,由於不再涉及Spark SQL,因此stage的並行度就會等於你手動設定的值,這樣就避免了Spark SQL所在的stage只能用少量的task去處理大量資料並執行復雜的演算法邏輯。使用repartition運算元的前後對比如上圖所示**。
#### 8. reduceByKey本地預聚合
**reduceByKey相較於普通的shuffle操作一個顯著的特點就是會進行map端的本地聚合**,map端會先對本地的資料進行combine操作,然後將資料寫入給下個stage的每個task建立的檔案中,也就是在map端,對每一個key對應的value,執行reduceByKey運算元函式。
reduceByKey運算元的執行過程如下圖所示:
![reduceByKey 運算元執行過程](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210228_8.png)
使用reduceByKey對效能的提升如下:
1. 本地聚合後,在map端的資料量變少,減少了磁碟IO,也減少了對磁碟空間的佔用;
2. 本地聚合後,下一個stage拉取的資料量變少,減少了網路傳輸的資料量;
3. 本地聚合後,在reduce端進行資料快取的記憶體佔用減少;
4. 本地聚合後,在reduce端進行聚合的資料量減少。
基於reduceByKey的本地聚合特徵,我們應該考慮使用reduceByKey代替其他的shuffle運算元,例如groupByKey。
groupByKey與reduceByKey的執行原理如下圖1和圖2所示:
![圖1:groupByKey原理](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210228_9.png)
![圖2:reduceByKey原理](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210228_10.png)
根據上圖可知,groupByKey不會進行map端的聚合,而是將所有map端的資料shuffle到reduce端,然後在reduce端進行資料的聚合操作。由於reduceByKey有map端聚合的特性,使得網路傳輸的資料量減小,因此效率要明顯高於groupByKey。
#### 9. 使用持久化+checkpoint
Spark持久化在大部分情況下是沒有問題的,但是有時資料可能會丟失,如果資料一旦丟失,就需要對丟失的資料重新進行計算,計算完後再快取和使用,為了避免資料的丟失,可以選擇對這個RDD進行checkpoint,也就是**將資料持久化一份到容錯的檔案系統上(比如HDFS)**。
一個RDD快取並checkpoint後,如果一旦發現快取丟失,就會優先檢視checkpoint資料存不存在,如果有,就會使用checkpoint資料,而不用重新計算。也即是說,checkpoint可以視為cache的保障機制,如果cache失敗,就使用checkpoint的資料。
使用checkpoint的**優點在於提高了Spark作業的可靠性,一旦快取出現問題,不需要重新計算資料,缺點在於,checkpoint時需要將資料寫入HDFS等檔案系統,對效能的消耗較大**。
持久化設定如下:
```java
sc.setCheckpointDir(‘HDFS’)
rdd.cache/persist(memory_and_disk)
rdd.checkpoint
```
#### 10. 使用廣播變數
預設情況下,task中的運算元中如果使用了外部的變數,每個task都會獲取一份變數的複本,這就造成了記憶體的極大消耗。一方面,如果後續對RDD進行持久化,可能就無法將RDD資料存入記憶體,只能寫入磁碟,磁碟IO將會嚴重消耗效能;另一方面,task在建立物件的時候,也許會發現堆記憶體無法存放新建立的物件,這就會導致頻繁的GC,GC會導致工作執行緒停止,進而導致Spark暫停工作一段時間,嚴重影響Spark效能。
假設當前任務配置了20個Executor,指定500個task,有一個20M的變數被所有task共用,此時會在500個task中產生500個副本,耗費叢集10G的記憶體,如果使用了廣播變數, 那麼每個Executor儲存一個副本,一共消耗400M記憶體,記憶體消耗減少了5倍。
廣播變數在每個Executor儲存一個副本,此Executor的所有task共用此廣播變數,這讓變數產生的副本數量大大減少。
在初始階段,廣播變數只在Driver中有一份副本。task在執行的時候,想要使用廣播變數中的資料,此時首先會在自己本地的Executor對應的BlockManager中嘗試獲取變數,如果本地沒有,BlockManager就會從Driver或者其他節點的BlockManager上遠端拉取變數的複本,並由本地的BlockManager進行管理;之後此Executor的所有task都會直接從本地的BlockManager中獲取變數。
對於多個Task可能會共用的資料可以廣播到每個Executor上:
```java
val 廣播變數名= sc.broadcast(會被各個Task用到的變數,即需要廣播的變數)
廣播變數名.value//獲取廣播變數
```
#### 11. 使用Kryo序列化
預設情況下,Spark使用Java的序列化機制。Java的序列化機制使用方便,不需要額外的配置,在運算元中使用的變數實現Serializable介面即可,但是,Java序列化機制的效率不高,序列化速度慢並且序列化後的資料所佔用的空間依然較大。
Spark官方宣稱Kryo序列化機制比Java序列化機制**效能提高10倍左右**,Spark之所以沒有預設使用Kryo作為序列化類庫,是因為**它不支援所有物件的序列化**,同時Kryo需要使用者在使用前註冊需要序列化的型別,不夠方便,**但從Spark 2.0.0版本開始,簡單型別、簡單型別陣列、字串型別的Shuffling RDDs 已經預設使用Kryo序列化方式了**。
Kryo序列化註冊方式的程式碼如下:
```java
public class MyKryoRegistrator implements KryoRegistrator{
@Override
public void registerClasses(Kryo kryo){
kryo.register(StartupReportLogs.class);
}
}
```
配置Kryo序列化方式的程式碼如下:
```java
//建立SparkConf物件
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化庫
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//在Kryo序列化庫中註冊自定義的類集合
conf.set("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator");
```
> 本文首發於公眾號:五分鐘學大資料,回覆【666】即可獲得全套大資料筆面