spark運算元調優
1、MapPartitions提升Map類操作效能
spark中,最基本的原則,就是每個task處理一個RDD的partition。
1.1 MapPartitions的優缺點
MapPartitions操作的優點:
如果是普通的map,比如一個partition中有1萬條資料。ok,那麼你的function要執行和計算1萬次。
但是,使用MapPartitions操作之後,一個task僅僅會執行一次function,function一次接收所有的partition資料。只要執行一次就可以了,效能比較高。
MapPartitions的缺點:
如果是普通的map操作,一次function的執行就處理一條資料。那麼如果記憶體不夠用的情況下,比如處理了1千條資料了,那麼這個時候記憶體不夠了,那麼就可以將已經處理完的1千條資料從記憶體裡面垃圾回收掉,或者用其他方法,騰出空間來吧。
所以說普通的map操作通常不會導致記憶體的OOM異常。
但是MapPartitions操作,對於大量資料來說,比如甚至一個partition,100萬資料,一次傳入一個function以後,那麼可能一下子記憶體不夠,但是又沒有辦法去騰出記憶體空間來,可能就OOM,記憶體溢位。
1.2 MapPartitions使用場景
當分析的資料量不是特別大的時候,都可以用這種MapPartitions系列操作,效能還是非常不錯的,是有提升的。比如原來是15分鐘,(曾經有一次效能調優),12分鐘。10分鐘->9分鐘。
但是也有過出問題的經驗,MapPartitions只要一用,直接OOM,記憶體溢位,崩潰。
在專案中,自己先去估算一下RDD的資料量,以及每個partition的量,還有自己分配給每個executor的記憶體資源。看看一下子記憶體容納所有的partition資料行不行。如果行,可以試一下,能跑通就好。效能肯定是有提升的。但是試了以後,發現OOM了,那就放棄吧。
2、filter過後使用coalesce減少分割槽
2.1 出現問題
預設情況下,經過了filter之後,RDD中的每個partition的資料量,可能都不太一樣了。(原本每個partition的資料量可能是差不多的)
可能出現的問題:
1、每個partition資料量變少了,但是在後面進行處理的時候,還是要跟partition數量一樣數量的task,來進行處理,有點浪費task計算資源。
2、每個partition的資料量不一樣,會導致後面的每個task處理每個partition的時候,每個task要處理的資料量就不同,這樣就會導致有些task執行的速度很快,有些task執行的速度很慢。這就是資料傾斜。
針對上述的兩個問題,我們希望應該能夠怎麼樣?
1、針對第一個問題,我們希望可以進行partition的壓縮吧,因為資料量變少了,那麼partition其實也完全可以對應的變少。比如原來是4個partition,現在完全可以變成2個partition。那麼就只要用後面的2個task來處理即可。就不會造成task計算資源的浪費。(不必要,針對只有一點點資料的partition,還去啟動一個task來計算)
2、針對第二個問題,其實解決方案跟第一個問題是一樣的,也是去壓縮partition,儘量讓每個partition的資料量差不多。那麼後面的task分配到的partition的資料量也就差不多。不會造成有的task執行速度特別慢,有的task執行速度特別快。避免了資料傾斜的問題。
2.2 解決問題方法
呼叫coalesce運算元
主要就是用於在filter操作之後,針對每個partition的資料量各不相同的情況,來壓縮partition的數量,而且讓每個partition的資料量都儘量均勻緊湊。從而便於後面的task進行計算操作,在某種程度上,能夠一定程度的提升效能。
3、使用foreachPartition優化寫資料庫效能
3.1 預設的foreach的效能缺陷在哪裡?
首先,對於每條資料,都要單獨去呼叫一次function,task為每個資料,都要去執行一次function函式。
如果100萬條資料,(一個partition),呼叫100萬次。效能比較差。
另外一個非常非常重要的一點
如果每個資料,你都去建立一個數據庫連線的話,那麼你就得建立100萬次資料庫連線。
但是要注意的是,資料庫連線的建立和銷燬,都是非常非常消耗效能的。雖然我們之前已經用了資料庫連線池,只是建立了固定數量的資料庫連線。
你還是得多次通過資料庫連線,往資料庫(MySQL)傳送一條SQL語句,然後MySQL需要去執行這條SQL語句。如果有100萬條資料,那麼就是100萬次傳送SQL語句。
以上兩點(資料庫連線,多次傳送SQL語句),都是非常消耗效能的。
3.2 用了foreachPartition運算元之後,好處在哪裡?
1、對於我們寫的function函式,就呼叫一次,一次傳入一個partition所有的資料。
2、主要建立或者獲取一個數據庫連線就可以。
3、只要向資料庫傳送一次SQL語句和多組引數即可。
注意,與mapPartitions操作一樣,如果一個partition的數量真的特別特別大,比如是100萬,那基本上就不太靠譜了。很有可能會發生OOM,記憶體溢位的問題。
4、使用repartition解決Spark SQL低並行度的效能問題
4.1 設定並行度
並行度:之前說過,並行度是設定的:
1、spark.default.parallelism
2、textFile(),傳入第二個引數,指定partition數量(比較少用)
在生產環境中,是最好設定一下並行度。官網有推薦的設定方式,根據你的application的總cpu core數量(在spark-submit中可以指定),自己手動設定spark.default.parallelism引數,指定為cpu core總數的2~3倍。
4.2 你設定的這個並行度,在哪些情況下會生效?什麼情況下不會生效?
如果你壓根兒沒有使用Spark SQL(DataFrame),那麼你整個spark application預設所有stage的並行度都是你設定的那個引數。(除非你使用coalesce運算元縮減過partition數量)。
問題來了,用Spark SQL的情況下,stage的並行度沒法自己指定。Spark SQL自己會預設根據hive表對應的hdfs檔案的block,自動設定Spark SQL查詢所在的那個stage的並行度。你自己通過spark.default.parallelism引數指定的並行度,只會在沒有Spark SQL的stage中生效。
比如你第一個stage,用了Spark SQL從hive表中查詢出了一些資料,然後做了一些transformation操作,接著做了一個shuffle操作(groupByKey)。下一個stage,在shuffle操作之後,做了一些transformation操作。hive表,對應了一個hdfs檔案,有20個block。你自己設定了spark.default.parallelism引數為100。
你的第一個stage的並行度,是不受你的控制的,就只有20個task。第二個stage,才會變成你自己設定的那個並行度,100。
5、reduceByKey本地聚合介紹
reduceByKey,相較於普通的shuffle操作(比如groupByKey),它的一個特點,就是說,會進行map端的本地聚合。對map端給下個stage每個task建立的輸出檔案中,寫資料之前,就會進行本地的combiner操作,也就是說對每一個key,對應的values,都會執行你的運算元函式(_ + _)
5.1 用reduceByKey對效能的提升
1、在本地進行聚合以後,在map端的資料量就變少了,減少磁碟IO。而且可以減少磁碟空間的佔用。
2、下一個stage,拉取資料的量,也就變少了。減少網路的資料傳輸的效能消耗。
3、在reduce端進行資料快取的記憶體佔用變少了。
4、reduce端,要進行聚合的資料量也變少了。
5.2 reduceByKey在什麼情況下使用呢?
1、非常普通的,比如說,就是要實現類似於wordcount程式一樣的,對每個key對應的值,進行某種資料公式或者演算法的計算(累加、類乘)。
2、對於一些類似於要對每個key進行一些字串拼接的這種較為複雜的操作,可以自己衡量一下,其實有時,也是可以使用reduceByKey來實現的。但是不太好實現。如果真能夠實現出來,對效能絕對是有幫助的。(shuffle基本上就佔了整個spark作業的90%以上的效能消耗,主要能對shuffle進行一定的調優,都是有價值的)