1. 程式人生 > >Hive Join 分析和優化

Hive Join 分析和優化

                                                       開啟微信掃一掃,關注微信公眾號【資料與演算法聯盟】 

轉載請註明出處:http://blog.csdn.net/gamer_gyt 
博主微博:http://weibo.com/234654758 
Github:https://github.com/thinkgamer

背景

Sku對應品牌進行關聯,大表對應非大表(這裡的非大表並不能用小表來定義)

 

問題分析

進行表左關聯時,最後一個reduce任務卡到99%,執行時間很長,發生了嚴重的資料傾斜。

什麼是資料傾斜?資料傾斜主要表現在,map /reduce程式執行時,reduce節點大部分執行完畢,但是有一個或者幾個reduce節點執行很慢,導致整個程式的處理時間很長,這是因為某一個key的條數比其他key多很多(有時是百倍或者千倍之多),這條key所在的reduce節點所處理的資料量比其他節點就大很多,從而導致某幾個節點遲遲執行不完。

Hive join

MapReduce流程圖

Join流程

HQL:SELECT a.id, a.dept, b.age FROM a join b ON a.id = b.id

 

Map階段

1:讀取源表的資料,Map輸出時候以Join on條件中的列為key,如果Join有多個關聯鍵,則以這些關聯鍵的組合作為key; 

2:Map輸出的value為join之後所關心的(select或者where中需要用到的)列;同時在value中還會包含表的Tag資訊,用於標明此value對應哪個表; 

3:按照key進行排序;

4:map的個數取決於資料量和hdfs的分片大小,比如說hdfs分片配置的是128M(也可能是256M),然後資料目錄下有一個檔案為300M,那麼對於的map個數為3個;如果目錄下有三個檔案,分別為20M,40M,200M,則對應的map個數為4個。

5:Map個數是不是越多越好?答案是否定的。如果一個任務有很多小檔案(遠遠小於塊大小128m),則每個小檔案也會被當做一個塊,用一個map任務來完成,而一個map任務啟動和初始化的時間遠遠大於邏輯處理的時間,就會造成很大的資源浪費。而且,同時可執行的map數是受限的。

6:是不是保證每個map處理接近128m的檔案塊,就可以了? 答案也是不一定。比如有一個127m的檔案,正常會用一個map去完成,但這個檔案只有一個或者兩個小欄位,卻有幾千萬的記錄,如果map處理的邏輯比較複雜,用一個map任務去做,肯定也比較耗時。

Shuffle階段

根據key的值進行hash,並將key/value按照hash值推送至不同的reduce中,這樣確保兩個表中相同的key位於同一個reduce中

Reduce階段

1:根據key的值完成join操作,期間通過Tag來識別不同表中的資料

2:reduce的個數在不進行指定的情況下,hive會猜測一個reduce個數,基於以下兩個設定:

 

計算reducer數的公式很簡單N=min(引數2,總輸入資料量/引數1) 

即,如果reduce的輸入(map的輸出)總大小不超過1G,那麼只會有一個reduce任務

3:reduce個數設定

1) 調整hive.exec.reducers.bytes.per.reducer引數的值; 

set hive.exec.reducers.bytes.per.reducer=500000000; (500M) 

2) set mapred.reduce.tasks = 15; 

4:reduce個數是不是越多越好?同map一樣,啟動和初始化reduce也會消耗時間和資源; 另外,有多少個reduce,就會有多少個輸出檔案,如果生成了很多個小檔案,那麼如果這些小檔案作為下一個任務的輸入,則也會出現小檔案過多的問題;

Hive Join型別

參考:http://lxw1234.com/archives/2015/06/315.htm

優化

hive啟動的set設定

set hive.exec.compress.output = true; 
-- map/reduce 輸出壓縮(一般採用序列化檔案儲存)
set mapred.output.compression.codec = com.hadoop.compression.lzo.LzopCodec; 
-- 壓縮檔案格式為 lzo

set hive.map.aggr = true; 
-- map端是否聚合

set hive.merge.mapfiles = true; 
-- map輸出是否合併

set hive.merge.mapredfiles = true; 
-- reduce輸出是否合併

set hive.merge.size.per.task = 256000000; 
-- 設定的合併檔案的大小

set hive.merge.smallfiles.avgsize = 134000000; 
-- 當輸出檔案的平均大小小於該值時,啟動一個獨立的map-reduce任務進行檔案merge

set hive.groupby.mapaggr.checkinterval = 100000; 
--這個是group的鍵對應的記錄條數超過這個值則會進行分拆,值根據具體資料量設定

set hive.groupby.skewindata = true;
-- 資料傾斜優化,為true時,查詢計劃生產兩個mapreduce,第一個mr隨機處理,第二個按照業務主鍵聚合

set hive.optimize.skewjoin = true;
-- 如果是join 過程出現傾斜 應該設定為true

set hive.skewjoin.key = 100000; 
--這個是join的鍵對應的記錄條數超過這個值則會進行分拆,值根據具體資料量設定

set hive.exec.parallel = true; 
-- 同一個sql中的不同的job是否可以同時執行

set mapred.reduce.tasks = 2000設定reduce 個數

set mapred.max.split.size = 100000000; 
-- 一個split 最大的大小

set mapred.min.split.size.per.node = 100000000; 
-- 一個節點上(datanode)split至少的大小

set mapred.min.split.size.per.rack = 100000000; 
-- 同一個交換機(rack locality)下split至少的大小

set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 
-- 執行Map前進行小檔案合併

set mapred.queue.names = xxxxxxxx 
-- 設定佇列

HQL本身注意事項

1):大小表 內關聯時,小表放前,載入到記憶體中

2):兩表join的key,做空值、不需要的值,不合格的值過濾,然後用hash 或者隨機數 結合case when then else進行處理

eg:需要關聯的key 為 brand_id,然後表中大部分brand_id ='0',這是時候在關聯時就要進行過濾,但僅僅過濾的話,這些為0的brand_id就會分配到一個reduce中,所以還要加個rand函式

on

(

    case

    when online.brand_code != '0'

    then online.brand_code

    else cast(ceiling(rand() * - 65535) as string)

    end = brand.brand_idcase

)

3):如果關聯的某個key是必須的,但是下邊又有很多條資料記錄,然後導致資料傾斜

hash即可,比如說我要以三級品類和品牌為key進行join,但某個三級品類對於的品牌下的sku個數特別多,就會發生資料傾斜。

參考資料

http://shiyanjun.cn/archives/588.html

https://blog.csdn.net/B11050101/article/details/78754652