1. 程式人生 > >優化-hive大資料傾斜總結

優化-hive大資料傾斜總結

在做Shuffle階段的優化過程中,遇到了資料傾斜的問題,造成了對一些情況下優化效果不明顯。主要是因為在Job完成後的所得到的Counters是整個Job的總和,優化是基於這些Counters得出的平均值,而由於資料傾斜的原因造成map處理資料量的差異過大,使得這些平均值能代表的價值降低。Hive的執行是分階段的,map處理資料量的差異取決於上一個stage的reduce輸出,所以如何將資料均勻的分配到各個reduce中,就是解決資料傾斜的根本所在。規避錯誤來更好的執行比解決錯誤更高效。在查看了一些資料後,總結如下。

1資料傾斜的原因

1.1操作:

關鍵詞

情形

後果

Join

其中一個表較小,

但是key集中

分發到某一個或幾個Reduce上的資料遠高於平均值

大表與大表,但是分桶的判斷欄位0值或空值過多

這些空值都由一個reduce處理,灰常慢

group by

group by 維度過小,

某值的數量過多

處理某值的reduce灰常耗時

Count Distinct

某特殊值過多

處理此特殊值的reduce耗時

1.2原因:

1)、key分佈不均勻

2)、業務資料本身的特性

3)、建表時考慮不周

4)、某些SQL語句本身就有資料傾斜

1.3表現:

任務進度長時間維持在99%(或100%),檢視任務監控頁面,發現只有少量(1個或幾個)reduce子任務未完成。因為其處理的資料量和其他reduce差異過大。

單一reduce的記錄數與平均記錄數差異過大,通常可能達到3倍甚至更多。 最長時長遠大於平均時長。

2資料傾斜的解決方案

2.1引數調節:

hive.map.aggr=true

Map 端部分聚合,相當於Combiner

hive.groupby.skewindata=true

有資料傾斜的時候進行負載均衡,當選項設定為 true,生成的查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分佈到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的資料結果按照 Group By Key 分佈到 Reduce 中(這個過程可以保證相同的 Group By Key 被分佈到同一個 Reduce 中),最後完成最終的聚合操作。

2.2 SQL語句調節:

如何Join:

關於驅動表的選取,選用join key分佈最均勻的表作為驅動表

做好列裁剪和filter操作,以達到兩表做join的時候,資料量相對變小的效果。

大小表Join:

使用map join讓小的維度表(1000條以下的記錄條數) 先進記憶體。在map端完成reduce.

大表Join大表:

把空值的key變成一個字串加上隨機數,把傾斜的資料分到不同的reduce上,由於null值關聯不上,處理後並不影響最終結果。

count distinct大量相同特殊值

count distinct時,將值為空的情況單獨處理,如果是計算count distinct,可以不用處理,直接過濾,在最後結果中加1。如果還有其他計算,需要進行group by,可以先將值為空的記錄單獨處理,再和其他計算結果進行union。

group by維度過小:

採用sum() group by的方式來替換count(distinct)完成計算。

特殊情況特殊處理:

在業務邏輯優化效果的不大情況下,有些時候是可以將傾斜的資料單獨拿出來處理。最後union回去。

3典型的業務場景

3.1空值產生的資料傾斜

場景:如日誌中,常會有資訊丟失的問題,比如日誌中的 user_id,如果取其中的 user_id 和 使用者表中的user_id 關聯,會碰到資料傾斜的問題。

解決方法1: user_id為空的不參與關聯(紅色字型為修改後)

複製程式碼
select * from log a
  join users b
  on a.user_id is not null
  and a.user_id = b.user_id
union all
select * from log a
  where a.user_id is null;
複製程式碼

解決方法2 :賦與空值分新的key值

select *
  from log a
  left outer join users b
  on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;

結論:方法2比方法1效率更好,不但io少了,而且作業數也少了。解決方法1中 log讀取兩次,jobs是2。解決方法2 job數是1 。這個優化適合無效 id (比如 -99 , ’’, null 等) 產生的傾斜問題。把空值的 key 變成一個字串加上隨機數,就能把傾斜的資料分到不同的reduce上 ,解決資料傾斜問題。

3.2不同資料型別關聯產生資料傾斜

場景:使用者表中user_id欄位為int,log表中user_id欄位既有string型別也有int型別。當按照user_id進行兩個表的Join操作時,預設的Hash操作會按int型的id來進行分配,這樣會導致所有string型別id的記錄都分配到一個Reducer中。

解決方法:把數字型別轉換成字串型別

select * from users a
  left outer join logs b
  on a.usr_id = cast(b.user_id as string)

3.3小表不小不大,怎麼用 map join 解決傾斜問題

使用 map join 解決小表(記錄數少)關聯大表的資料傾斜問題,這個方法使用的頻率非常高,但如果小表很大,大到map join會出現bug或異常,這時就需要特別的處理。 以下例子:

select * from log a
  left outer join users b
  on a.user_id = b.user_id;

users 表有 600w+ 的記錄,把 users 分發到所有的 map 上也是個不小的開銷,而且 map join 不支援這麼大的小表。如果用普通的 join,又會碰到資料傾斜的問題。

解決方法:

複製程式碼
select /*+mapjoin(x)*/* from log a
  left outer join (
    select  /*+mapjoin(c)*/d.*
      from ( select distinct user_id from log ) c
      join users d
      on c.user_id = d.user_id
    ) x
  on a.user_id = b.user_id;
 
複製程式碼

假如,log裡user_id有上百萬個,這就又回到原來map join問題。所幸,每日的會員uv不會太多,有交易的會員不會太多,有點選的會員不會太多,有佣金的會員不會太多等等。所以這個方法能解決很多場景下的資料傾斜問題。

4總結

使map的輸出資料更均勻的分佈到reduce中去,是我們的最終目標。由於Hash演算法的侷限性,按key Hash會或多或少的造成資料傾斜。大量經驗表明資料傾斜的原因是人為的建表疏忽或業務邏輯可以規避的。在此給出較為通用的步驟:

1、取樣log表,哪些user_id比較傾斜,得到一個結果表tmp1。由於對計算框架來說,所有的資料過來,他都是不知道資料分佈情況的,所以取樣是並不可少的。

2、資料的分佈符合社會學統計規則,貧富不均。傾斜的key不會太多,就像一個社會的富人不多,奇特的人不多一樣。所以tmp1記錄數會很少。把tmp1和users做map join生成tmp2,把tmp2讀到distribute file cache。這是一個map過程。

3、map讀入users和log,假如記錄來自log,則檢查user_id是否在tmp2裡,如果是,輸出到本地檔案a,否則生成<user_id,value>的key,value對,假如記錄來自member,生成<user_id,value>的key,value對,進入reduce階段。

4、最終把a檔案,把Stage3 reduce階段輸出的檔案合併起寫到hdfs。

如果確認業務需要這樣傾斜的邏輯,考慮以下的優化方案:

1、對於join,在判斷小表不大於1G的情況下,使用map join

2、對於group by或distinct,設定 hive.groupby.skewindata=true

3、儘量使用上述的SQL語句調節進行優化

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

傾斜的原因:

  使map的輸出資料更均勻的分佈到reduce中去,是我們的最終目標。由於Hash演算法的侷限性,按key Hash會或多或少的造成資料傾斜。大量經驗表明資料傾斜的原因是人為的建表疏忽或業務邏輯可以規避的。

解決思路:

  Hive的執行是分階段的,map處理資料量的差異取決於上一個stage的reduce輸出,所以如何將資料均勻的分配到各個reduce中,就是解決資料傾斜的根本所在

具體辦法:

記憶體優化和I/O優化:

  驅動表:使用大表做驅動表,以防止記憶體溢位;Join最右邊的表是驅動表;Mapjoin無視join順序,用大表做驅動表;StreamTable。

 1. Mapjoin是一種避免避免資料傾斜的手段

  允許在map階段進行join操作,MapJoin把小表全部讀入記憶體中,在map階段直接拿另外一個表的資料和記憶體中表資料做匹配,由於在map是進行了join操作,省去了reduce執行的效率也會高很多

在《hive:join遇到問題》有具體操作

  在對多個表join連線操作時,將小表放在join的左邊,大表放在Jion的右邊,

  在執行這樣的join連線時小表中的資料會被快取到記憶體當中,這樣可以有效減少發生記憶體溢位錯誤的機率

 2. 設定引數

  hive.map.aggr = true

  hive.groupby.skewindata=true 還有其他引數 

3.SQL語言調節

  比如: group by維度過小時:採用sum() group by的方式來替換count(distinct)完成計算

4.StreamTable

  將在reducer中進行join操作時的小table放入記憶體,而大table通過stream方式讀取 

5.索引

  Hive從0.80開始才有,提供了一個Bitmap點陣圖索引,索引可以加快GROUP BY查詢語句的執行速度,用的較少。 
       

其他優化:

1、 列裁剪(Column pruning):只有需要用到的列才進行輸出 

2、 謂詞下推(Predicate pushdown):儘早進行資料過濾(見圖表 7中,下面為先處理的邏
輯),減少後續處理的資料量 

3、 分割槽裁剪(Partition pruning):只讀取滿足分割槽條件的檔案 
4、 map-join:對於join中一些小檔案,可以在map階段進行join操作,見3.2.2節map-join部分 
5、 join-reordering:將在reducer中進行join操作時的小table放入記憶體,而大table通過
stream方式讀取 
6、 Group-by優化: 進行區域性聚合進行優化(包括hash-based和sort-based),對於skew
的key(key的row num和size在reduce時非常不均)可以進行兩次map-reduce的方式優化 

Hive的配置引數比較保守,所以效率會比較差一點,修改配置會讓查詢效率有比較大的提升,記錄幾個對查詢效率影響比較重要的引數。

元資料:

巢狀SQL並行執行優化:

set hive.exec.parallel=true;

set hive.exec.parallel.thread.number=16;

四、排序優化

   Order by 實現全域性排序,一個reduce實現,效率低

   Sort by 實現部分有序,單個reduce輸出的結果是有序的,效率高,通常和DISTRIBUTE BY關鍵字一起使用(DISTRIBUTE BY關鍵字 可以指定map 到 reduce端的分發key)

   CLUSTER BY col1 等價於DISTRIBUTE BY col1 SORT BY col1.

五、合併小檔案

   檔案數目過多,會給 HDFS 帶來壓力,並且會影響處理效率,可以通過合併 Map 和 Reduce 的結果檔案來儘量消除這樣的影響

   hive.merge.mapfiles = true是否和並 Map 輸出檔案,預設為 True

   hive.merge.mapredfiles = false是否合併 Reduce 輸出檔案,預設為 False

   hive.merge.size.per.task = 256*1000*1000合併檔案的大小。

   這裡的引數沒有寫到上面的表格裡是因為這是可以根據任務不同臨時設定的,而不一定非要是全域性設定。有時候全域性設定了反而對大檔案的操作有效能影響。

六、使用分割槽,RCFile,lzo,ORCFile等

   Hive中的每個分割槽都對應hdfs上的一個目錄,分割槽列也不是表中的一個實際的欄位,而是一個或者多個偽列,在表的資料檔案中實際上並不儲存分割槽列的資訊與資料。Partition關鍵字中排在前面的為主分割槽(只有一個),後面的為副分割槽

   靜態分割槽:靜態分割槽在載入資料和使用時都需要在sql語句中指定

   例:(stat_date=‘20120625‘,province=‘hunan‘)

   動態分割槽:使用動態分割槽需要設定hive.exec.dynamic.partition引數值為true,預設值為false,在預設情況下,hive會假設主分割槽時靜態分割槽,副分割槽使用動態分割槽;如果想都使用動態分割槽,需要設定set hive.exec.dynamic.partition.mode=nostrick,預設為strick

   例:(stat_date=‘20120625‘,province)