spark 一些運算元的使用及優化
阿新 • • 發佈:2019-02-04
1、MapPartitions
spark中,最基本的原則,就是每個task處理一個RDD的partition。
MapPartitions操作的優點:
如果是普通的map,比如一個partition中有1萬條資料;ok,那麼你的function要執行和計算1萬次。
但是,使用MapPartitions操作之後,一個task僅僅會執行一次function,function一次接收所有的partition資料。只要執行一次就可以了,效能比較高。
MapPartitions的缺點:一定是有的。
如果是普通的map操作,一次function的執行就處理一條資料;那麼如果記憶體不夠用的情況下,比如處理了1千條資料了,那麼這個時候記憶體不夠了,那麼就可以將已經處理完的1千條資料從記憶體裡面垃圾回收掉,或者用其他方法,騰出空間來吧。
所以說普通的map操作通常不會導致記憶體的OOM異常。
但是MapPartitions操作,對於大量資料來說,比如甚至一個partition,100萬資料,一次傳入一個function以後,那麼可能一下子記憶體不夠,但是又沒有辦法去騰出記憶體空間來,可能就OOM,記憶體溢位。
什麼時候比較適合用MapPartitions系列操作,就是說,資料量不是特別大的時候,都可以用這種MapPartitions系列操作,效能還是非常不錯的,是有提升的。比如原來是15分鐘,(曾經有一次效能調優),12分鐘。10分鐘->9分鐘。
但是也有過出問題的經驗,MapPartitions只要一用,直接OOM,記憶體溢位,崩潰。
在專案中,自己先去估算一下RDD的資料量,以及每個partition的量,還有自己分配給每個executor的記憶體資源。看看一下子記憶體容納所有的partition資料,行不行。如果行,可以試一下,能跑通就好。效能肯定是有提升的。
2、 reduceByKey
transformation 操作,類似於MapReduce 中的combiner
val lines = sc.textFile("hdfs://")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val counts = pairs.reduceByKey(_ + _)
counts.collect()
reduceByKey,相較於普通的shuffle操作(比如groupByKey),它的一個特點,就是說,
會進行map端的本地聚合。
對map端給下個stage每個task建立的輸出檔案中,寫資料之前,就會進行本地的combiner操作,
也就是說對每一個key,對應的values,都會執行你的運算元函式(_ + _)
用reduceByKey對效能的提升:
1、在本地進行聚合以後,在map端的資料量就變少了,減少磁碟IO。而且可以減少磁碟空間的佔用。
2、下一個stage,拉取資料的量,也就變少了。減少網路的資料傳輸的效能消耗。
3、在reduce端進行資料快取的記憶體佔用變少了。
4、reduce端,要進行聚合的資料量也變少了。
總結:
reduceByKey在什麼情況下使用呢?
1、非常普通的,比如說,就是要實現類似於wordcount程式一樣的,對每個key對應的值,
進行某種資料公式或者演算法的計算(累加、累乘)
2、對於一些類似於要對每個key進行一些字串拼接的這種較為複雜的操作,可以自己衡量一下,
其實有時,也是可以使用reduceByKey來實現的。但是不太好實現。如果真能夠實現出來,
對效能絕對是有幫助的。(shuffle基本上就佔了整個Spark作業的90%以上的效能消耗,
主要能對shuffle進行一定的調優,都是有價值的)
我們的程式沒有那麼去做!但是把這個當作一個課後思考題給大家,看大家能不能對我們的聚合session
的操作應用上ReduceByKey來提高效能!
運算元優化 repartiton
運算元調優之使用repartition解決Spark SQL低並行度的效能問題
spark.sql.shuffle.partitions 調整DataFrame的shuffle並行度
spark.default.parallelism 調整RDD的shuffle並行度
並行度:之前說過,並行度是自己可以調節,或者說是設定的。
1、spark.default.parallelism
2、textFile(),傳入第二個引數,指定partition數量(比較少用)
咱們的專案程式碼中,沒有設定並行度,實際上,在生產環境中,是最好自己設定一下的。
官網有推薦的設定方式,你的spark-submit指令碼中,會指定你的application總共要啟動多少個executor,
100個;每個executor多少個cpu core,2~3個;總共application,有cpu core,200個。
官方推薦,根據你的application的總cpu core數量(在spark-submit中可以指定,200個),
自己手動設定spark.default.parallelism引數,指定為cpu core總數的2~3倍。400~600個並行度。600。
承上啟下
你設定的這個並行度,在哪些情況下會生效?哪些情況下,不會生效?
如果你壓根兒沒有使用Spark SQL(DataFrame),那麼你整個spark application預設所有stage的並行度
都是你設定的那個引數。(除非你使用coalesce運算元縮減過partition數量)
問題來了,Spark SQL,用了。用Spark SQL的那個stage的並行度,你沒法自己指定。
Spark SQL自己會預設根據Hive表對應的hdfs檔案的block,自動設定Spark SQL查詢所在的那個stage的
並行度。你自己通過spark.default.parallelism引數指定的並行度,只會在沒有Spark SQL的stage中生效。
比如你第一個stage,用了Spark SQL從hive表中查詢出了一些資料,然後做了一些transformation操作,
接著做了一個shuffle操作(groupByKey);下一個stage,在shuffle操作之後,
做了一些transformation操作。hive表,對應了一個hdfs檔案,有20個block;
你自己設定了spark.default.parallelism引數為100。
你的第一個stage的並行度,是不受你的控制的,就只有20個task;第二個stage,
才會變成你自己設定的那個並行度,100。
問題在哪裡?
Spark SQL預設情況下,它的那個並行度,咱們沒法設定。可能導致的問題,也許沒什麼問題,
也許很有問題。Spark SQL所在的那個stage中,後面的那些transformation操作,
可能會有非常複雜的業務邏輯,甚至說複雜的演算法。如果你的Spark SQL預設把task數量設定的很少,
20個,然後每個task要處理為數不少的資料量,然後還要執行特別複雜的演算法。
這個時候,就會導致第一個stage的速度,特別慢。第二個stage,1000個task,刷刷刷,非常快。
解決上述Spark SQL無法設定並行度和task數量的辦法,是什麼呢?
repartition運算元,你用Spark SQL這一步的並行度和task數量,肯定是沒有辦法去改變了。但是呢,
可以將你用Spark SQL查詢出來的RDD,使用repartition運算元,去重新進行分割槽,
此時可以分割槽成多個partition,比如從20個partition,分割槽成100個。
然後呢,從repartition以後的RDD,再往後,並行度和task數量,就會按照你預期的來了。
就可以避免跟Spark SQL繫結在一個stage中的運算元,只能使用少量的task去處理大量資料以及
複雜的演算法邏輯。
這裡就很有可能發生上面說的問題
比如說,Spark SQl預設就給第一個stage設定了20個task,但是根據你的資料量以及演算法的複雜度
實際上,你需要1000個task去並行執行
所以說,在這裡,就可以對Spark SQL剛剛查詢出來的RDD執行repartition重分割槽操作
3、 filter
預設情況下,經過了這種filter之後,RDD中的每個partition的資料量,可能都不太一樣了。
(原本每個partition的資料量可能是差不多的)
問題:
1、每個partition資料量變少了,但是在後面進行處理的時候,還是要跟partition數量一樣數量的task,
來進行處理;有點浪費task計算資源。
2、每個partition的資料量不一樣,會導致後面的每個task處理每個partition的時候,
每個task要處理的資料量就不同,這個時候很容易發生什麼問題?
資料傾斜。。。。
比如說,第二個partition的資料量才100;但是第三個partition的資料量是900;
那麼在後面的task處理邏輯一樣的情況下,不同的task要處理的資料量可能差別達到了9倍,
甚至10倍以上;同樣也就導致了速度的差別在9倍,甚至10倍以上。
這樣的話呢,就會導致有些task執行的速度很快;有些task執行的速度很慢。這,就是資料傾斜。
針對上述的兩個問題,我們希望應該能夠怎麼樣?
1、針對第一個問題,我們希望可以進行partition的壓縮吧,因為資料量變少了,
那麼partition其實也完全可以對應的變少。比如原來是4個partition,現在完全可以變成2個partition。
那麼就只要用後面的2個task來處理即可。就不會造成task計算資源的浪費。
(不必要,針對只有一點點資料的partition,還去啟動一個task來計算)
2、針對第二個問題,其實解決方案跟第一個問題是一樣的;也是去壓縮partition,
儘量讓每個partition的資料量差不多。那麼這樣的話,後面的task分配到的partition的資料量
也就差不多。不會造成有的task執行速度特別慢,有的task執行速度特別快。避免了資料傾斜的問題。
有了解決問題的思路之後,接下來,我們該怎麼來做呢?實現?
4、coalesce
主要就是用於在filter操作之後,針對每個partition的資料量各不相同的情況,來壓縮partition的數量。
減少partition的數量,而且讓每個partition的資料量都儘量均勻緊湊。
從而便於後面的task進行計算操作,在某種程度上,能夠一定程度的提升效能。
說明一下:
這兒,是對完整的資料進行了filter過濾,過濾出來點選行為的資料點選行為的資料其實只佔總資料的一小部分(譬如 20%)
所以過濾以後的RDD,每個partition的資料量,很有可能跟我們之前說的一樣,會很不均勻而且資料量肯定會變少很多
所以針對這種情況,還是比較合適用一下coalesce運算元的,在filter過後去減少partition的數量
coalesce(100)
這個就是說經過filter之後再把資料壓縮的比較緊湊,壓縮為100個數據分片,也就是形成了 100 個 partition
對這個coalesce操作做一個說明
如果執行模式都是local模式,主要是用來測試,所以local模式下,
不用去設定分割槽和並行度的數量
local模式自己本身就是程序內模擬的叢集來執行,本身效能就很高
而且對並行度、partition數量都有一定的內部的優化
這裡我們再自己去設定,就有點畫蛇添足
但是就是跟大家說明一下,coalesce運算元的使用,即可
5、foreachPartition
foreach的寫庫原理
預設的foreach的效能缺陷在哪裡?
首先,對於每條資料,都要單獨去呼叫一次function,task為每個資料,都要去執行一次function函式。
如果100萬條資料,(一個partition),呼叫100萬次。效能比較差。
另外一個非常非常重要的一點
如果每個資料,你都去建立一個數據庫連線的話,那麼你就得建立100萬次資料庫連線。
但是要注意的是,資料庫連線的建立和銷燬,都是非常非常消耗效能的。雖然我們之前已經用了
資料庫連線池,只是建立了固定數量的資料庫連線。
你還是得多次通過資料庫連線,往資料庫(MySQL)傳送一條SQL語句,然後mysql需要去執行這條SQL語句。
如果有100萬條資料,那麼就是100萬次傳送SQL語句。
以上兩點(資料庫連線,多次傳送SQL語句),都是非常消耗效能的。
foreachPartition,在生產環境中,通常來說,都使用foreachPartition來寫資料庫的
使用批處理操作(一條SQL和多組引數)
傳送一條SQL語句,傳送一次
一下子就批量插入100萬條資料。
用了foreachPartition運算元之後,好處在哪裡?
1、對於我們寫的function函式,就呼叫一次,一次傳入一個partition所有的資料
2、主要建立或者獲取一個數據庫連線就可以
3、只要向資料庫傳送一次SQL語句和多組引數即可
在實際生產環境中,清一色,都是使用foreachPartition操作;但是有個問題,跟mapPartitions操作一樣,
如果一個partition的數量真的特別特別大,比如真的是100萬,那基本上就不太靠譜了。
一下子進來,很有可能會發生OOM,記憶體溢位的問題。
一組資料的對比:生產環境
一個partition大概是1千條左右
用foreach,跟用foreachPartition,效能的提升達到了2~3分鐘。
實際專案操作:
首先JDBCHelper裡面已經封裝好了一次批量插入操作!
批量插入session detail
唯一不一樣的是我們需要ISessionDetailDAO裡面去實現一個批量插入
List<SessionDetail> sessionDetails