1. 程式人生 > >spark運算元調優

spark運算元調優

1、MapPartitions提升Map類操作效能

spark中,最基本的原則,就是每個task處理一個RDD的partition。

1.1  MapPartitions的優缺點

MapPartitions操作的優點:

如果是普通的map,比如一個partition中有1萬條資料。ok,那麼你的function要執行和計算1萬次。

但是,使用MapPartitions操作之後,一個task僅僅會執行一次function,function一次接收所有的partition資料。只要執行一次就可以了,效能比較高。

MapPartitions的缺點:

如果是普通的map操作,一次function的執行就處理一條資料。那麼如果記憶體不夠用的情況下,比如處理了1千條資料了,那麼這個時候記憶體不夠了,那麼就可以將已經處理完的1千條資料從記憶體裡面垃圾回收掉,或者用其他方法,騰出空間來吧。

所以說普通的map操作通常不會導致記憶體的OOM異常。

但是MapPartitions操作,對於大量資料來說,比如甚至一個partition,100萬資料,一次傳入一個function以後,那麼可能一下子記憶體不夠,但是又沒有辦法去騰出記憶體空間來,可能就OOM,記憶體溢位。

1.2  MapPartitions使用場景

當分析的資料量不是特別大的時候,都可以用這種MapPartitions系列操作,效能還是非常不錯的,是有提升的。比如原來是15分鐘,(曾經有一次效能調優),12分鐘。10分鐘->9分鐘。

但是也有過出問題的經驗,MapPartitions只要一用,直接OOM,記憶體溢位,崩潰。

在專案中,自己先去估算一下RDD的資料量,以及每個partition的量,還有自己分配給每個executor的記憶體資源。看看一下子記憶體容納所有的partition資料行不行。如果行,可以試一下,能跑通就好。效能肯定是有提升的。但是試了以後,發現OOM了,那就放棄吧。

 

2、filter過後使用coalesce減少分割槽

2.1 出現問題

預設情況下,經過了filter之後,RDD中的每個partition的資料量,可能都不太一樣了。(原本每個partition的資料量可能是差不多的)

可能出現的問題:

1、每個partition資料量變少了,但是在後面進行處理的時候,還是要跟partition數量一樣數量的task,來進行處理,有點浪費task計算資源。

2、每個partition的資料量不一樣,會導致後面的每個task處理每個partition的時候,每個task要處理的資料量就不同,這樣就會導致有些task執行的速度很快,有些task執行的速度很慢。這就是資料傾斜。

針對上述的兩個問題,我們希望應該能夠怎麼樣?

1、針對第一個問題,我們希望可以進行partition的壓縮吧,因為資料量變少了,那麼partition其實也完全可以對應的變少。比如原來是4個partition,現在完全可以變成2個partition。那麼就只要用後面的2個task來處理即可。就不會造成task計算資源的浪費。(不必要,針對只有一點點資料的partition,還去啟動一個task來計算)

2、針對第二個問題,其實解決方案跟第一個問題是一樣的,也是去壓縮partition,儘量讓每個partition的資料量差不多。那麼後面的task分配到的partition的資料量也就差不多。不會造成有的task執行速度特別慢,有的task執行速度特別快。避免了資料傾斜的問題。

2.2  解決問題方法

呼叫coalesce運算元

主要就是用於在filter操作之後,針對每個partition的資料量各不相同的情況,來壓縮partition的數量,而且讓每個partition的資料量都儘量均勻緊湊。從而便於後面的task進行計算操作,在某種程度上,能夠一定程度的提升效能。

 

3、使用foreachPartition優化寫資料庫效能

3.1  預設的foreach的效能缺陷在哪裡?

首先,對於每條資料,都要單獨去呼叫一次function,task為每個資料,都要去執行一次function函式。

如果100萬條資料,(一個partition),呼叫100萬次。效能比較差。

另外一個非常非常重要的一點

如果每個資料,你都去建立一個數據庫連線的話,那麼你就得建立100萬次資料庫連線。

但是要注意的是,資料庫連線的建立和銷燬,都是非常非常消耗效能的。雖然我們之前已經用了資料庫連線池,只是建立了固定數量的資料庫連線。

你還是得多次通過資料庫連線,往資料庫(MySQL)傳送一條SQL語句,然後MySQL需要去執行這條SQL語句。如果有100萬條資料,那麼就是100萬次傳送SQL語句。

以上兩點(資料庫連線,多次傳送SQL語句),都是非常消耗效能的。

3.2  用了foreachPartition運算元之後,好處在哪裡?

1、對於我們寫的function函式,就呼叫一次,一次傳入一個partition所有的資料。

2、主要建立或者獲取一個數據庫連線就可以。

3、只要向資料庫傳送一次SQL語句和多組引數即可。

注意,與mapPartitions操作一樣,如果一個partition的數量真的特別特別大,比如是100萬,那基本上就不太靠譜了。很有可能會發生OOM,記憶體溢位的問題。

4、使用repartition解決Spark SQL低並行度的效能問題

4.1  設定並行度

並行度:之前說過,並行度是設定的:

1、spark.default.parallelism

2、textFile(),傳入第二個引數,指定partition數量(比較少用)

在生產環境中,是最好設定一下並行度。官網有推薦的設定方式,根據你的application的總cpu core數量(在spark-submit中可以指定),自己手動設定spark.default.parallelism引數,指定為cpu core總數的2~3倍。

4.2 你設定的這個並行度,在哪些情況下會生效?什麼情況下不會生效?

如果你壓根兒沒有使用Spark SQL(DataFrame),那麼你整個spark application預設所有stage的並行度都是你設定的那個引數。(除非你使用coalesce運算元縮減過partition數量)。

問題來了,用Spark SQL的情況下,stage的並行度沒法自己指定。Spark SQL自己會預設根據hive表對應的hdfs檔案的block,自動設定Spark SQL查詢所在的那個stage的並行度。你自己通過spark.default.parallelism引數指定的並行度,只會在沒有Spark SQL的stage中生效。

比如你第一個stage,用了Spark SQL從hive表中查詢出了一些資料,然後做了一些transformation操作,接著做了一個shuffle操作(groupByKey)。下一個stage,在shuffle操作之後,做了一些transformation操作。hive表,對應了一個hdfs檔案,有20個block。你自己設定了spark.default.parallelism引數為100。

你的第一個stage的並行度,是不受你的控制的,就只有20個task。第二個stage,才會變成你自己設定的那個並行度,100。

5、reduceByKey本地聚合介紹

reduceByKey,相較於普通的shuffle操作(比如groupByKey),它的一個特點,就是說,會進行map端的本地聚合。對map端給下個stage每個task建立的輸出檔案中,寫資料之前,就會進行本地的combiner操作,也就是說對每一個key,對應的values,都會執行你的運算元函式(_ + _)

5.1  用reduceByKey對效能的提升

1、在本地進行聚合以後,在map端的資料量就變少了,減少磁碟IO。而且可以減少磁碟空間的佔用。

2、下一個stage,拉取資料的量,也就變少了。減少網路的資料傳輸的效能消耗。

3、在reduce端進行資料快取的記憶體佔用變少了。

4、reduce端,要進行聚合的資料量也變少了。

5.2  reduceByKey在什麼情況下使用呢?

1、非常普通的,比如說,就是要實現類似於wordcount程式一樣的,對每個key對應的值,進行某種資料公式或者演算法的計算(累加、類乘)。

2、對於一些類似於要對每個key進行一些字串拼接的這種較為複雜的操作,可以自己衡量一下,其實有時,也是可以使用reduceByKey來實現的。但是不太好實現。如果真能夠實現出來,對效能絕對是有幫助的。(shuffle基本上就佔了整個spark作業的90%以上的效能消耗,主要能對shuffle進行一定的調優,都是有價值的)