1. 程式人生 > >hive的數據傾斜以及優化策略

hive的數據傾斜以及優化策略

set 創建 sel 並不是 ipc 並排 case exe 經驗

1. hive的數據傾斜

介紹:只要在分布式一定有shuffle,避免不了出現數據傾斜,在混淆數據的過程中出現數據分布不均勻。比如:在MR編程中reducetask階中的數據的大小不一致,即很多的數據集中到了一個reducetask中,hive的數據傾斜就是mapreduce的數據傾斜 maptask reducetask最後就是reducetask階段的數據傾斜。
不會產生數據傾斜的場景
   - 不執行MapReduce程序,在hive中hive.fetch.task.conversion一共有三個可選值:
    - none:表示所有的語句都執行MR,這個參數不可用

    - minimal :表示select *、where 字段為分區字段、limit時不執行MR
    - more :select 、filter /where 、limit不執行MR
   - 聚合函數和group by一起使用的時候,在聚合函數和group by一起使用的時候,默認的MR底層在map端執行combiner,所以不會數據傾斜
會產生數據傾斜的場景
   - 聚合函數不和group by連用
   - count(distinct)
   - join 主要是reduce join會產生數據傾斜

具體場景分析:

1)join時null值過多

 以log日誌為例,其中有一個字段為userid,但是userid的null值太多,在使用userid進行join時,所有的userid=null的數據都會到一個reduce中,這個reducetask數據量很大,就會產生數據傾斜。
  解決方法1

#null值不參與連接
select field1,field2,field3…
from log a left join user b on a.userid is not null and a.userid=b.
userid
union select field1,field2,field3 from log where userid is null;

  解決方法2
#將null值進行散列

select 
    * 
    from log a left join user b on 
    case when a.userid is null then concat("null",rand()) 
    else a.userid end=b.userid;

2)連接的時候兩個表連接的列的類型不統一

user userid string 表一
log userid int 表二
select * from log a left join user b on a.userid=b.userid;
默認情況下,將string轉化為int類型,如果string類型的userid中是無法轉化為int類型,那麽返回了大量的null,然後大量的null, 又會分配到同一個reducetask中,造成數據傾斜。只要確定能將string轉化為int類型,就可以避免數據傾斜。

3)join端產生數據傾斜

大小表連接:大表和小表進行關聯的時候,使用map端的join,在map join 時是沒有數據傾斜的。其中有兩個參數:
   - hive.auto.convert.join #開啟map join,默認是開啟的
   - hive.smalltable.filesize #在進行mapjoin時對小表大小的限制,默認是25000000byte,大概25M
大小表連接,但是小表數據量較大
  這個小表不是很大,但是超過了25000000byte;此時默認執行reducejoin,此時如果執行了reduce join就容易產生數據傾斜,如果這時小表的大小不是很大不超過100M,那麽可以強制執行map join:

#強制執行map join
select
/*+mapjoin(表名)*/       #將小表強制放入內存
* from t1 join t2 on t1.field1=t2.field;

大表*大表:對其中的一個表進行過濾,將這個表轉化成相對小的表,然後強制執行map端join
這裏以兩個表為例:
user ----30G(所有用戶)
Log ----5G (當日記錄的日誌)

#先對log日誌表進行userid  過濾:
create table temp_log as 
                    select distinct userid  from log;
#將上面的結果和user表進行關聯:(獲取userid表中有效的關聯數據)
create table temp_user as   
select filed1, filed2,field3
                        /*+mapjoin(a)*/
                        from temp a join user b on a.userid =b.userid;
#最後,在將上面的表與log進行關聯:
select filed1, filed2,field3
                        /*+mapjoin(a)*/
                    from temp_user a join Log b on a. userid =b. userid;

2. hive的優化

(1)常用優化手段:

   - 好的設計模型,在設計表的時候註意數據傾斜
   - 解決數據傾斜問題
   - 減少job數量
   - 設置合理的reduce task個數
   - 了解數據的分布情況,手動解決數據傾斜
   - 在數據量比較大的時候,盡量少用全局聚合類的操作
   - 對小文件進行合並,減少maptask個數,提高性能

(1)具體的優化方案:

   ① 如何正確的選擇排序:
    - cluster by:對同一字段分桶並排序,不能和 sort by 連用
    - distribute by + sort by:分桶,保證同一字段值只存在一個結果文件當中,結合 sort by 保證 每個 reduceTask 結果有序
    - sort by:單機排序,單個 reduce 結果有序
    - order by:全局排序,缺陷是只能使用一個 reduce task
   ② 怎樣做笛卡爾積:當 Hive 設定為嚴格模式(hive.mapred.mode=strict)時,不允許在 HQL 語句中出現笛卡爾積
解決笛卡爾積問題:http://blog.51cto.com/14048416/2338651
文章中的:6)使用隨機前綴和擴容RDD進行join,有細致講解。
   ③ 怎樣寫好 in/exists:

#使用left semi join.去代替in/exists:
select a.id, a.name from a where a.id in (select b.id from b);
#變化為:
selecet a.id,a.name from a left semi join b on a.id=b.id;

技術分享圖片
博文:http://blog.51cto.com/14048416/2342407 其中的關於left semi join 的總結。
   ④ 合理處理maptask個數:
    Maptask個數太大:每個Maptask都要啟動一個jvm進程,啟動時間過長,效率低,Maptask個數太小:負載不均衡,大量作業時,容易阻塞集群。因此通常有兩種手段來解決問題:
   - 減少Maptask個數,通過合並小文件實現,主要針對數據源
   - 通過設置重用jvm進程的方式,減少MapReduce程序在啟動和關閉jvm進程的時間:(set mapred.job.reuse.jvm.num.tasks=5) 表示map task 重用同一個jvm.
   ⑤ 合理設置reduce task個數:
     reducer 個數的設定極大影響執行效率,這使得 Hive 怎樣決定 reducer 個數成為一個關鍵問題,默認的在hive中只啟動一個reducetask。其中有以下幾個參數作為調優點:
   - hive.exec.reducers.bytes.per.reducer #reduceTask的吞吐量
   - hive.exec.reducers.max #啟動的reducetask的最大值 經驗之談:0.95*(集群中 datanode 個數)
   - mapreduce.job.reduces= #設置reducetask的個數
   ⑥ 小文件合並:
     小文件過多,會給hdfs帶來壓力,並且會影響處理效率,可以通過合並 Map 和 Reduce 的 結果文件來消除這樣的影響,以下幾個參數可以作為調優點:
   - set hive.merge.mapfiles = true 在只有maptask時,任務結束時進行文件合並
   - set hive.merge.mapredfiles = false # true 時在 MapReduce 的任務結束時合並小文件
   - set hive.merge.size.per.task = 25610001000 #合並的小文件的大小
   - set mapred.max.split.size=256000000; #每個map最大分割數
   - set mapred.min.split.size.per.node=1; #一個節點上的最小split值
   - set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;,#執行map前進行小文件合並(默認開啟)
   ⑦ 合理的設置分區:
   Partition 就是分區。分區通過在創建表時啟用 partitioned by 實現,為了減少查詢時候的數據掃描範圍 提升查詢性能,當數據兩個比較大的時候,對經常按照某一個字段進行過濾查詢的時候,就需要按照過濾字段創建分區表。
   ⑧ 合理的利用存儲格式:
   創建表時,盡量使用 orc、parquet 這些列式存儲格式,因為列式存儲的表,每一列的數據在物理上是存儲在一起的,Hive 查詢時會只遍歷需要列數據,大大減少處理的數據量。
   ⑨ 並行化處理
   一個hive sql語句可能轉化為多個mapreduce Job,每一個job就是一個stage,這些job順序執行,在這個client的運行日誌也可以看到。但是有的時候這些任務之間並不是相互依賴的,如果集群資源允許,可以讓多個並不相互依賴的stage並發執行。以下有兩個參數可以調優:
    - set hive.exec.parallel=true; #開啟並行
    - set hive.exec.parallel.thread.number=8; //同一個 sql 允許並行任務的最大線程數
   ⑩ 設置壓縮存儲
    Hive最終是因為轉為MapReduce程序來執行,而MapReduce的性能瓶頸在與網絡和IO,要解決性能瓶頸,最主要的就是減少數據量,對數據進行壓縮是一個很好的辦法。
技術分享圖片

Job輸出文件按照block以gzip的方式進行壓縮:
set mapreduce.output.fileoutputformat.compress=true // 默認值是 false 
set mapreduce.output.fileoutputformat.compress.type=BLOCK // 默認值是 Record 
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默認值是 org.apache.hadoop.io.compress.DefaultCodec

map輸出結果以gzip進行壓縮:
set mapred.map.output.compress=true 
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默認值是 org.apache.hadoop.io.compress.DefaultCodec

對hive輸出結果和中間都進行壓縮:
set hive.exec.compress.output=true // 默認值是 false,不壓縮 
set hive.exec.compress.intermediate=true // 默認值是 false,為 true 時 MR 設置的壓縮才啟用

hive的數據傾斜以及優化策略