Hive Join 分析和優化
開啟微信掃一掃,關注微信公眾號【資料與演算法聯盟】
轉載請註明出處:http://blog.csdn.net/gamer_gyt
博主微博:http://weibo.com/234654758
Github:https://github.com/thinkgamer
背景
Sku對應品牌進行關聯,大表對應非大表(這裡的非大表並不能用小表來定義)
問題分析
進行表左關聯時,最後一個reduce任務卡到99%,執行時間很長,發生了嚴重的資料傾斜。
什麼是資料傾斜?資料傾斜主要表現在,map /reduce程式執行時,reduce節點大部分執行完畢,但是有一個或者幾個reduce節點執行很慢,導致整個程式的處理時間很長,這是因為某一個key的條數比其他key多很多(有時是百倍或者千倍之多),這條key所在的reduce節點所處理的資料量比其他節點就大很多,從而導致某幾個節點遲遲執行不完。
Hive join
MapReduce流程圖
Join流程
HQL:SELECT a.id, a.dept, b.age FROM a join b ON a.id = b.id
Map階段
1:讀取源表的資料,Map輸出時候以Join on條件中的列為key,如果Join有多個關聯鍵,則以這些關聯鍵的組合作為key;
2:Map輸出的value為join之後所關心的(select或者where中需要用到的)列;同時在value中還會包含表的Tag資訊,用於標明此value對應哪個表;
3:按照key進行排序;
4:map的個數取決於資料量和hdfs的分片大小,比如說hdfs分片配置的是128M(也可能是256M),然後資料目錄下有一個檔案為300M,那麼對於的map個數為3個;如果目錄下有三個檔案,分別為20M,40M,200M,則對應的map個數為4個。
5:Map個數是不是越多越好?答案是否定的。如果一個任務有很多小檔案(遠遠小於塊大小128m),則每個小檔案也會被當做一個塊,用一個map任務來完成,而一個map任務啟動和初始化的時間遠遠大於邏輯處理的時間,就會造成很大的資源浪費。而且,同時可執行的map數是受限的。
6:是不是保證每個map處理接近128m的檔案塊,就可以了? 答案也是不一定。比如有一個127m的檔案,正常會用一個map去完成,但這個檔案只有一個或者兩個小欄位,卻有幾千萬的記錄,如果map處理的邏輯比較複雜,用一個map任務去做,肯定也比較耗時。
Shuffle階段
根據key的值進行hash,並將key/value按照hash值推送至不同的reduce中,這樣確保兩個表中相同的key位於同一個reduce中
Reduce階段
1:根據key的值完成join操作,期間通過Tag來識別不同表中的資料
2:reduce的個數在不進行指定的情況下,hive會猜測一個reduce個數,基於以下兩個設定:
計算reducer數的公式很簡單N=min(引數2,總輸入資料量/引數1)
即,如果reduce的輸入(map的輸出)總大小不超過1G,那麼只會有一個reduce任務
3:reduce個數設定
1) 調整hive.exec.reducers.bytes.per.reducer引數的值;
set hive.exec.reducers.bytes.per.reducer=500000000; (500M)
2) set mapred.reduce.tasks = 15;
4:reduce個數是不是越多越好?同map一樣,啟動和初始化reduce也會消耗時間和資源; 另外,有多少個reduce,就會有多少個輸出檔案,如果生成了很多個小檔案,那麼如果這些小檔案作為下一個任務的輸入,則也會出現小檔案過多的問題;
Hive Join型別
參考:http://lxw1234.com/archives/2015/06/315.htm
優化
hive啟動的set設定
set hive.exec.compress.output = true;
-- map/reduce 輸出壓縮(一般採用序列化檔案儲存)
set mapred.output.compression.codec = com.hadoop.compression.lzo.LzopCodec;
-- 壓縮檔案格式為 lzo
set hive.map.aggr = true;
-- map端是否聚合
set hive.merge.mapfiles = true;
-- map輸出是否合併
set hive.merge.mapredfiles = true;
-- reduce輸出是否合併
set hive.merge.size.per.task = 256000000;
-- 設定的合併檔案的大小
set hive.merge.smallfiles.avgsize = 134000000;
-- 當輸出檔案的平均大小小於該值時,啟動一個獨立的map-reduce任務進行檔案merge
set hive.groupby.mapaggr.checkinterval = 100000;
--這個是group的鍵對應的記錄條數超過這個值則會進行分拆,值根據具體資料量設定
set hive.groupby.skewindata = true;
-- 資料傾斜優化,為true時,查詢計劃生產兩個mapreduce,第一個mr隨機處理,第二個按照業務主鍵聚合
set hive.optimize.skewjoin = true;
-- 如果是join 過程出現傾斜 應該設定為true
set hive.skewjoin.key = 100000;
--這個是join的鍵對應的記錄條數超過這個值則會進行分拆,值根據具體資料量設定
set hive.exec.parallel = true;
-- 同一個sql中的不同的job是否可以同時執行
set mapred.reduce.tasks = 2000設定reduce 個數
set mapred.max.split.size = 100000000;
-- 一個split 最大的大小
set mapred.min.split.size.per.node = 100000000;
-- 一個節點上(datanode)split至少的大小
set mapred.min.split.size.per.rack = 100000000;
-- 同一個交換機(rack locality)下split至少的大小
set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
-- 執行Map前進行小檔案合併
set mapred.queue.names = xxxxxxxx
-- 設定佇列
HQL本身注意事項
1):大小表 內關聯時,小表放前,載入到記憶體中
2):兩表join的key,做空值、不需要的值,不合格的值過濾,然後用hash 或者隨機數 結合case when then else進行處理
eg:需要關聯的key 為 brand_id,然後表中大部分brand_id ='0',這是時候在關聯時就要進行過濾,但僅僅過濾的話,這些為0的brand_id就會分配到一個reduce中,所以還要加個rand函式
on
(
case
when online.brand_code != '0'
then online.brand_code
else cast(ceiling(rand() * - 65535) as string)
end = brand.brand_idcase
)
3):如果關聯的某個key是必須的,但是下邊又有很多條資料記錄,然後導致資料傾斜
hash即可,比如說我要以三級品類和品牌為key進行join,但某個三級品類對於的品牌下的sku個數特別多,就會發生資料傾斜。
參考資料
http://shiyanjun.cn/archives/588.html
https://blog.csdn.net/B11050101/article/details/78754652