1. 程式人生 > >Spark:對資料傾斜的八種處理方法

Spark:對資料傾斜的八種處理方法

目錄

1. 什麼是資料傾斜

資料傾斜是一種很常見的問題(依據二八定律),簡單來說,比方WordCount中某個Key對應的資料量非常大的話,就會產生資料傾斜,導致兩個後果:

  1. OOM(單或少數的節點);
  2. 拖慢整個Job執行時間(其他已經完成的節點都在等這個還在做的節點)。

2. 解決資料傾斜需要

  1. 搞定 Shuffle;
  2. 搞定業務場景;
  3. 搞定 CPU core 的使用情況;(這裡的core是虛擬的core而不是機器的物理CPU核,可以理解為就是Executor的一個工作執行緒。
  4. 搞定 OOM(記憶體溢位) 的根本原因等:一般都因為資料傾斜(某task任務的資料量過大,GC壓力大,和Kafka不同在於Kafka的記憶體不經過JVM,其基於Linux的Page)。

3. 導致Spark資料傾斜的本質

Shuffle時,需將各節點的相同key的資料拉取到某節點上的一個task來處理,若某個key對應的資料量很大就會發生資料傾斜。比方說大部分key對應10條資料,某key對應10萬條,大部分task只會被分配10條資料,很快做完,個別task分配10萬條資料,不僅執行時間長,且整個stage的作業時間由最慢的task決定

資料傾斜只會發生在Shuffle過程,以下演算法可能觸發Shuffle操作: 

distinct:

distinct的操作其實是把原RDD進行map操作,根據原來的key-value生成為key,value使用null來替換,並對新生成的RDD執行reduceByKey的操作,也就是說,Distinct的操作是根據key與value一起計算不重複的結果.只有兩個記錄中key與value都不重複才算是不重複的資料。

groupByKey:

groupByKey會將RDD[key,value] 按照相同的key進行分組,形成RDD[key,Iterable[value]]的形式, 有點類似於sql中的groupby,例如類似於mysql中的group_concat

reduceByKey:

reduceByKey,就是將key相同的鍵值對,按照Function進行計算。如程式碼中就是將key相同的各value進行累加。得到的結果就是類似於[(key2,2), (key3,1), (key1,2)] 形式。

aggregateByKey 函式:

對PairRDD中相同的Key值進行聚合操作,在聚合過程中同樣使用了一箇中立的初始值。和aggregate函式類似,aggregateByKey返回值的型別不需要和RDD中value的型別一致

。因為aggregateByKey是對相同Key中的值進行聚合操作,所以aggregateByKey'函式最終返回的型別還是PairRDD,對應的結果是Key和聚合後的值,而aggregate函式直接返回的是非RDD的結果。

join:

join類似於SQL的inner join操作,返回結果是前面和後面集合中配對成功的,過濾掉關聯不上的。

cogroup:

對兩個RDD中的kv元素,每個RDD中相同key中的元素分別聚合成一個集合。與reduceByKey不同的是針對兩個RDD中相同的key的元素進行合併。

repartition:

返回一個恰好有numPartitions個分割槽的RDD,可以增加或者減少此RDD的並行度。內部,這將使用shuffle重新分佈資料,如果你減少分割槽數,考慮使用coalesce,這樣可以避免執行shuffle

以上等等。

4. 定位最慢的Task所處的原始碼位置

步驟一: 看資料傾斜發生在哪個stage(也就是看以上運算元出現在哪個階段)yarn-client模式下檢視本地log或Spark Web UI中當前執行的是哪個stage;yarn-cluster模式下,通過Spark Web UI檢視執行到了哪個Stage。 
主要看最慢的Stage各task分配的資料量,來確定是否是資料傾斜。

步驟二:根據Stage劃分,推算傾斜發生的程式碼(必然有Shuffle類運算元)。簡單實用方法:只要看到shuffle類運算元或Spark SQL的SQL語句會有Shuffle類的運算元的句子,就可以知道該地方劃分為前後兩個Stage。(用Python的PySpark介面,Spark Web UI會檢視task在原始碼中的行數,Java或者Scala 同理。)

5. 解決方案

方案一:使用Hive ETL預處理

  • 場景:若Hive表中資料不均勻,且業務中會頻繁用Spark對Hive表分析;
  • 思路:用Hive對資料預處理(對key聚合等操作),原本是Spark對Hive的原表操作,現在就是對Hive預處理後的表操作;
  • 原理:從根源解決了資料傾斜,規避了了Spark進行Shuffle類運算元操作。但Hive ETL中進行聚合等操作會發生資料傾斜,只是把慢轉移給了Hive ETL;
  • 優點:方便,效果好,規避了Spark資料傾斜;
  • 缺點:治標不治本,Hive ETL會資料傾斜。

方案二:過濾導致傾斜的key

  • 場景:發生傾斜的key很少且不重要;
  • 思路:對發生傾斜的key過濾掉。比方在Spark SQL中用where子句或filter過濾,若每次作業執行,需要動態判定可使用sample運算元對RDD取樣後取資料量最多的key過濾;
  • 原理:對傾斜的key過濾後,這些key便不會參與後面的計算,從本質上消除資料傾斜;
  • 優點:簡單,效果明顯;
  • 缺點:適用場景少,實際中導致傾斜的key很多。

方案三:提高Shuffle操作並行度

  • 場景:任何場景都可以,優先選擇的最簡單方案;
  • 思路:
  • 對RDD操作的Shuffle運算元傳入一個引數,也就是設定Shuffle運算元執行時的Shuffle read task數量。
  • 對於Spark SQL的Shuffle類語句(如group by,join)即spark.sql.shuffle.partitions,代表shuffle read task的並行度,預設值是200可修改
  • 原理:增大shuffle read task引數值,讓每個task處理比原來更少的資料;
  • 優點:簡單,有效;
  • 缺點:緩解的效果很有限。

方案四:兩階段聚合(區域性聚合+全域性聚合)

  • 場景:對RDD進行reduceByKey等聚合類shuffle運算元,SparkSQL的groupBy做分組聚合這兩種情況
  • 思路:首先通過map給每個key打上n以內的隨機數的字首並進行區域性聚合,即(hello, 1) (hello, 1) (hello, 1) (hello, 1)變為(1_hello, 1) (1_hello, 1) (2_hello, 1),並進行reduceByKey的區域性聚合,然後再次map將key的字首隨機數去掉再次進行全域性聚合;
  • 原理:對原本相同的key進行隨機數附加,變成不同key,讓原本一個task處理的資料分攤到多個task做區域性聚合,規避單task資料過量。之後再去隨機字首進行全域性聚合;
  • 優點:效果非常好(對聚合類Shuffle操作的傾斜問題);
  • 缺點:範圍窄(僅適用於聚合類的Shuffle操作,join類的Shuffle還需其它方案)。

方案五:將reduce join轉為map join

  • 場景:對RDD或Spark SQL使用join類操作或語句,且join操作的RDD或表比較小(百兆或1,2G);
  • 思路:使用broadcast和map類運算元實現join的功能替代原本的join,徹底規避shuffle。對較小RDD直接collect到記憶體,並建立broadcast變數;並對另外一個RDD執行map類運算元,在該運算元的函式中,從broadcast變數(collect出的較小RDD)與當前RDD中的每條資料依次比對key,相同的key執行你需要方式的join;
  • 原理:若RDD較小,可採用廣播小的RDD,並對大的RDD進行map,來實現與join同樣的效果。簡而言之,用broadcast-map代替join,規避join帶來的shuffle(無Shuffle無傾斜)
  • 優點:效果很好(對join操作導致的傾斜),根治;
  • 缺點:適用場景小(大表+小表),廣播(driver和executor節點都會駐留小表資料)小表也耗記憶體。

方案六:取樣傾斜key並分拆join操作

  • 場景:兩個較大的(無法採用方案五)RDD/Hive表進行join時,且一個RDD/Hive表中少數key資料量過大,另一個RDD/Hive表的key分佈較均勻(RDD中兩者之一有一個更傾斜);
  • 思路:
    • 1. 對更傾斜rdd1進行取樣(RDD.sample)並統計出資料量最大的幾個key;
    • 2. 對這幾個傾斜的key從原本rdd1中拆出形成一個單獨的rdd1_1,並打上0~n的隨機數字首,被拆分的原rdd1的另一部分(不包含傾斜key)又形成一個新rdd1_2;
    • 3. 對rdd2過濾出rdd1傾斜的key,得到rdd2_1,並將其中每條資料擴n倍,對每條資料按順序附加0~n的字首,被拆分出key的rdd2也獨立形成另一個rdd2_2; 
      【個人認為,這裡擴了n倍,最後union完還需要將每個傾斜key對應的value減去(n-1)】
    • 4. 將加了隨機字首的rdd1_1和rdd2_1進行join(此時原本傾斜的key被打散n份並被分散到更多的task中進行join); 
      【個人認為,這裡應該做兩次join,兩次join中間有一個map去字首】
    • 5. 另外兩個普通的RDD(rdd1_2、rdd2_2)照常join;
    • 6. 最後將兩次join的結果用union結合得到最終的join結果。
  • 原理:對join導致的傾斜是因為某幾個key,可將原本RDD中的傾斜key拆分出原RDD得到新RDD,並以加隨機字首的方式打散n份做join,將傾斜key對應的大量資料分攤到更多task上來規避傾斜;
  • 優點:前提是join導致的傾斜(某幾個key傾斜),避免佔用過多記憶體(只需對少數傾斜key擴容n倍);
  • 缺點:對過多傾斜key不適用。

方案七:用隨機字首和擴容RDD進行join

  • 場景:RDD中有大量key導致傾斜;
  • 思路:與方案六類似。 
    1. 檢視RDD/Hive表中資料分佈並找到造成傾斜的RDD/表; 
    2. 對傾斜RDD中的每條資料打上n以內的隨機數字首; 
    3. 對另外一個正常RDD的每條資料擴容n倍,擴容出的每條資料依次打上0到n的字首; 
    4. 對處理後的兩個RDD進行join。
  • 原理:與方案六隻有唯一不同在於這裡對不傾斜RDD中所有資料進行擴大n倍,而不是找出傾斜key進行擴容(這是方案六);
  • 優點:對join類的資料傾斜都可處理,效果非常顯著;
  • 缺點:緩解,擴容需要大記憶體。 
    【個人認為,這裡和方案六一樣,也需要對擴容的key對應的value最後減去(n-1),除非只需大小關係,對值沒有要求】

方案八:多種方案組合

實際中,需綜合著對業務全盤考慮,可先用方案一和二進行預處理,同時在需要Shuffle的操作提升Shuffle的並行度,最後針對資料分佈選擇後面方案中的一種或多種。實際中需要對資料和方案思路理解靈活應用。