第135-141課:Spark面試經典系列之資料傾斜
Spark面試經典系列之資料傾斜
Spark面試經典系列之資料傾斜:資料傾斜之痛
1、Spark效能真正的殺手
2、資料傾斜之痛
資料傾斜兩大直接致命性的後果:
1、OOM,一般OOM都是由於資料傾斜所致
2、速度變慢
資料傾斜基本形態特徵:個別Task處理大量資料
資料傾斜的定位:
1、Web UI,可以清晰看見哪些Task執行的資料量大小
2、Log,Log的一個好處是可以清晰的告訴是哪一行出現問題OOM,同時可以清晰的看到在具體哪個Stage出現了資料傾斜(資料傾斜一般會在Shuffle過程中產生的),從而定位具體Shuffle的程式碼。也有可能發現絕大多數Task非常快,但是個別Task非常慢。
3、程式碼走讀,重點看join、groupByKey、reduceByKey等關鍵程式碼
4、對資料特徵分佈進行分析;
解決原理和方法總論
1、Spark資料傾斜解決的原理總論
2、Spark資料傾斜解決方法總論
使資料膨脹,tachyon,複用RDD
Map端Reduce及問題思考
1、Spark資料傾斜解決之Map端Reduce
2、Map端Reduce的問題思考
給小的一段broadcast,然後在大的一端使用mapPartition。
如果資料量太大,有可能引起OOM
取樣分而治之解決方案
1、取樣演算法解決資料傾斜的思想
2、取樣演算法在Spark資料傾斜中的具體操作
某個或某幾個Key的Value非常大,從而導致資料傾斜
RDD1和RDD2進行join操作,其中我們採用取樣的方式發現RDD1中有嚴重的資料傾斜的Key
第一步:採用Spark RDD中提供的取樣介面,我們可以很方便的對全體(例如100億條資料)進行取樣,然後基於取樣的資料,我們可以計算出哪個(哪些)Key的Value個數最多;
第二步:把全體資料分成兩部分,即把原來RDD1變成RDD11和RDD12,其中RDD11代表導致資料傾斜的Key,RDD12中包含的是不會產生資料傾斜的Key;
第三步:把RDD11和RDD2進行join操作,且把RDD12和RDD2進行join操作,然後把分別join操作後的結果進行Union操作,從而得出和RDD1與RDD2直接進行join操作相同的結果
Spark自己的機制保證的不會產生資料傾斜。
上述流程中:
第一種情況:如果RDD11中的資料量不是很多,可以採用map端的join操作,避免了shuffle和資料傾斜。
第二種情況:如果RDD11中的資料量特別多,此時之所以能夠緩解資料傾斜是因為採用了Spark Core天然的並行機制對RDD11中的同樣一個Key的資料進行了拆分。從而達到讓原本傾斜的Key分散到不同的Task的目的,就緩解了資料傾斜。
思考:在上述過程中如果把傾斜的Key加上隨機數,會怎麼樣?
增加隨機數,並行Task數量可能增加,具體是如何操作的?
RDD11中傾斜的Key加上1000以內的隨機數,然後和RDD2進行join操作?不行!此時一定需要把RDD11中的Key在RDD2中的相同的Key進行1000以內的隨機數,然後再進行join操作,這樣做的好處:讓傾斜的Key更加不傾斜,在實際生產環境下,會極大的解決在兩個進行join的RDD數量都很大且其中一個RDD有一個或者兩三個明顯傾斜的Key的情況下的資料傾斜問題。
對於兩個RDD資料量都很大且傾斜的Key特別多如何解決?
1、資料量都很大且傾斜的Key多的情況
2、此種情況下具體操作步驟
兩個RDD資料都特別多且傾斜的Key成千上萬個,該如何解決資料傾斜的問題?
初步的想法:在傾斜的Key上面加上隨機數
該想法的原因:shuffle的時候把key的資料分到不同的task裡去
但是現在的傾斜的key非常多,成千上萬,所以如果說取樣找出傾斜的key的話並不是一個非常好的想法
擴容?
首先,什麼是擴容?就是把該RDD中的每一條資料變成5條、10條、20條等,例如RDD中原來是10億條資料,擴容後可能變成1000億條資料;
其次,如何做到擴容?flatMap中對要進行擴容的每一條資料都通過0~N-1個不同的字首變成N條資料(例如變成)
問題:N的值可以隨便取嗎?需要考慮當前程式能夠使用的Core的數量
答案:N的數值一般不能取的太大,通常小於50,否則會對磁碟、記憶體、網路都會形成極大負擔,例如會造成OOM
N這個數值取成10和1000除了OOM等不同以外,是否還有其他影響呢?其實N的數值的大小還會對資料傾斜的解決程度構成直接的影響!N越大,越不容易傾斜,但是也會佔用更多的記憶體、磁碟、網路以及(不必要的)消耗更多的CPU時間
模擬程式碼:
RDD1 join RDD2
rdd22 = RDD2.flatMap{
for(0 to 9) {
0_item
}
}
rdd11 = RDD1.map{
Random(10)
random_item
}
result = rdd11.join(rdd22)
result.map{
item_1.split 去掉字首
}
並行度的深度使用
1、並行度的初級使用
2、並行度的高階使用
用並行度解決資料傾斜的基本應用:例如reduceByKey
改變並行度之所以能夠改善資料傾斜的原因在於,如果某個Task有100個Key且資料量特別大,就極有可能導致OOM或者任務執行特別緩慢,此時如果把並行度變大,則可以分解該Task的資料量,例如把原本該Task的100個Key分解給10個Task,這個就可以減少每個Task的資料量,從而有可能解決OOM和任務慢的問題。
對於reduceByKey而言,你可以傳入並行度的引數,也可以自定義Partitioner
增加Executor:改變計算資源,從僅僅資料傾斜的角度來看並不能夠直接去解決資料傾斜的問題,但是也有好處,好處是可以同時併發執行更多的Task,結果是可能加快了執行速度。
用並行度解決資料傾斜的高階使用:例如reduceByKey
假設說有傾斜的Key,我們給所有的Key加上一個隨機數,然後進行reduceByKey操作;此時同一個Key會有不同的隨機數字首,在進行reduceByKey操作的時候原來的一個非常大的傾斜的Key就分而治之變成若干個更小的Key,不過此時結果和原來不一樣,怎麼破?進行map操作,目的是把隨機數字首去掉,然後再次進行reduceByKey操作。(當然,如果你很無聊,可以再次做隨機數字首),這樣我們就可以把原本傾斜的Key通過分而治之方案分散開來,最後又進行了全域性聚合,在這裡的本質還是通過改變並行度去解決資料傾斜的問題。
解決方案的“銀彈”是什麼?
1、資料傾斜解決方案總結
2、方案之外的方案
3、資料傾斜解決方案的“銀彈”?
逃離Spark技術本身之外如何解決資料傾斜的問題?
之所以會有這樣的想法,是因為從結果上來看,資料傾斜的產生來自於資料和資料的處理技術,前面幾節課和大家分享都是資料的處理技術層面如何解決資料傾斜,因此,我們現在需要回到資料的層面去解決資料傾斜的問題。
資料本身就是Key-Value的存在方式,所謂的資料傾斜就是說某(幾)個Key的Values特別多,所以如果要解決資料傾斜,實質上是解決單一的Key的Values的個數特別多的情況。新的資料傾斜解決方案由此誕生了。
1、把一個大的Key-Values資料分解成為Key-subKey-Values的方式
2、預先和其他的表進行join,將資料傾斜提前到上游的Hive ETL
3、可以把大的Key-Values中的Values組拼成為一個字串,從而形成只有一個元素的Key-Value。
4、加一箇中間適配層,當資料進來的時候進行Key的統計和動態排名,基於該排名動態調整Key分佈
假如10萬個Key都發生了資料傾斜,如何解決呢?此時一般就是加記憶體和Core