hive優化分享
粘貼一下我在部門中的一次hive優化的分享。
簡述
hive構建在hadoop基礎上,利用分布式存儲,通過mr引擎實現對大數據的計算。MR會頻繁地讀寫磁盤而且MR任務的啟動成本很高。對於hive優化顯得尤為重要。而優化的核心就是更好地利用hadoop的分布式特性和hive的有點。本篇從IO、參數設置、案例實戰來說明如何優化我們的hive。受限於個人能力,如有不足之處,還望指出,一起溝通討論。
1、IO
A、通過列裁剪,只讀取需要的列[對select * 的做法應進行嚴格要求,甚至禁止]
B、 join操作前,應提前處理表,只讀取需要的列,並對數據進行梳理。
原:select A.char1,B.char2
from A join B
on A.id=B.id
where A.disabled=0 and B.disabled=0
優:select A.char1,B.char2
from
(
select id,char1
from a
where a.disabled=0
) A
join
(
select id,char2
from b
where b.disabled=0
) B
on A.id=B.id
C、選擇合適的存儲類型
建議使用orc存儲格式,作為默認設置[支持切片,有壓縮,提高查詢速度]。創建表時,指定stored as orcfile
D、走分區[可選]
減少讀取更多的分區文件,極大地有幫助!處理分區時,不可對分區字段進行操作,否則將會導致全表掃描。
2、join
A、left semi join取代in
根據測試,對於在大數據量中查詢某些取值時,有效提升查詢效率。[測試:從erp_fct_order表中,查找若幹數據。left semi join 使用了100‘+,in耗時300‘+]
B、map join
當小表與大表進行關聯時,通過map join,使左表進入內存,與右表進行關聯。這個階段不涉及reduce。
通過設置set hive.auto.convert.join=true;[默認]
set hive.mapjoin.smalltable.filesize=2500000;[25M,通過配置該屬性來確定使用該優化的表的大小,如果表的大小小於此值就會被加載進內存中]
3、設置執行參數
reducer數量
我們執行語句時,總會看到這個:
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1515122600431_869586, Tracking URL = http://hadoop04:8088/proxy/application_1515122600431_869586/
上面其實就是,hadoop確定reduce的數量的方式。
N=min(hive.exec.reducers.max,總數據量大小/hive.exec.reducers.bytes.per.reducer),默認情況下,hive.exec.reducers.max=999,hive.exec.reducers.bytes.per.reducer=1G;
我們也可以強制指定reduce數量,mapreduce.job.reduces=N。
需要註意的是,有的reduce為0,有的只為1。
為0,即為該任務只有map,沒有reduce。
為1,即為全局,如order by、count、笛卡爾積。
mapper的數量
map端處理文件時,將對文件進行拆分成若幹個task數。而如何拆分,這本身和文件的存儲類型相關。hive系統默認的類型為org.apache.hadoop.hive.ql.io.CombineHiveInputFormat[set hive.input.format;],當我們不指定存儲類型時,hive將根據文件本身設置對應的存儲類型。如:org.apache.hadoop.mapred.TextInputFormat。
繼承了FileInputFormat的切分算法:splitSize = max{split.size,min{goalSize,blockSize}}
若格式為:HiveInputFormat
split.size:mapred.min.split.size,默認為1。切片最小大小,沒有什麽能比這個更小的了。
goalSize:目標大小,為用戶設置的map task數與實際大小決定。goalSize=總大小/設置的mapred.map.tasks的大小[默認為2]
blockSize:塊大小[set dfs.block.size],當前我們系統的塊為:128M
所以,當我們沒有設置最小的切片大小時,切片的大小,取決於文件大小、設置的mapred.map.tasks數、塊大小.
例如:一個文件如果是300M,則切片大小為:splitSize=max{1,min{300/2,128}}=128M,則3個map對應的文件大小為:128,128,46
若格式為:TextInputFormat:
split.size:mapred.max.split.size,[當前系統為256M],則splitSize為max{256,min{300/2,128}}=256,則只有一個map.
小結
mapper的數量合適,單個mapper處理的數據量合適;拆分大小盡量與block數據塊相同,避免一個拆分塊大小有多個 hdfs 塊,且位於不同數據節點,從而降低網絡 IO。如果想減少mapper的數據量,一般只要將mapred.max.split.size設置的更大就可以了。
合並小文件
hive基於NameNode進行調度管理和任務分發,NameNode本身是有瓶頸的,有固定的大小。在hadoop任務中,NameNode加載必要的信息,其中包括各個文件的元數據(位置、大小、塊信息等)。一般情況下,當文件數達到1000W時,所占用的NameNode內存達到3G,將帶來性能下降。Hadoop本身對大文件具有很高的處理能力,但在mr任務過程中,map任務和reduce任務都會帶來小文件的問題。當小文件作為輸入,由map來進行處理時,啟用map帶來的時間比處理更耗時。此外,reduce數量多的話,產生的小文件也會增多。
輸入時合並
set mapred.max.split.size=256000000;-- 每個Map最大輸入大小,決定合並後的文件數
set mapred.min.split.size.per.node=100000000; -- 一個節點上split的至少的大小 ,決定了多個data node上的文件是否需要合並 [當前設置為:1,默認設置。設置該參數,一定要設置下方的rack參數,否則報錯。]
set mapred.min.split.size.per.rack=100000000; -- 一個交換機下split的至少的大小,決定了多個交換機上的文件是否需要合並 [當前設置為:1,默認設置]
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- 執行Map前進行小文件合並[經過測試,設置上面的參數就能夠讓小文件進行合並了。]
輸出時合並
set hive.merge.mapfiles=true; -- 在map-only job後合並文件,默認true
set hive.merge.mapredfiles=true; -- 在map-reduce job後合並文件,默認false
set hive.merge.size.per.task=256000000; -- 合並後每個文件的大小,默認256000000
set hive.merge.smallfiles.avgsize=16000000; -- 平均文件大小,當結果文件小於該值時,將進行合並
數據傾斜
原因:當我們進行join操作時,會在map端根據key的hash值,shuffle到某一個reduce上去,在reduce端做join連接操作,內存中緩存join左邊的表,遍歷右邊的表,一次做join操作。所以在做join操作時候,將數據量多的表放在join的右邊。當數據量比較大,並且key分布不均勻,大量的key都shuffle到一個reduce上了,就出現了數據的傾斜。
場景
key為空:當join的key存在空值時,這些值會被hash到同一個reduce中,導致數據傾斜。遇到這樣的情況,建議區分空和非空
count(distinct):數據聚合類的操作sum、count,因為已經在map端做了聚合操作了,到reduce端的數據相對少一些[主要設置參數為:1.set hive.map.aggr=true;默認值,是否在map端進行聚合;2.set hive.groupby.mapaggr.checkinterval = 100000;默認值,在 Map 端進行聚合操作的條目數目],不會出現這個問題。當distinct時,會對group by後的字段和distinct的字段進行hash,如果存在大量重復值,則會導致數據傾斜。
遇到傾斜時set hive.groupby.skewindata = true;[負載均衡,默認為false。查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最後完成最終的聚合操作]
常用參數設置及說明
--類型:輸入時合並小文件
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; ----可在map端進行合並,減少小文件
set mapred.max.split.size=256000000; ----設置一個合理的數值,最大的切片數據。影響map的個數
set mapred.min.split.size.per.node=134217728; ----一個節點上的最小切片大小。設置為block大小
set mapred.min.split.size.per.rack=256000000; ----一個交換機上的最小切片大小
--類型:資源占用量
set mapreduce.map.memory.mb=5120;
set mapreduce.reduce.memory.mb=5120;
--類型:設置最大值
set mapreduce.job.reduces=80; ----用來對reduer數量進行控制,但設置的過大,不設置效果更好
set hive.exec.reducers.max=80; ----最大的reduce數
--類型:mapjoin
set hive.auto.convert.join=true;
set hive.mapjoin.smalltable.filesize=2500000;
--類型:輸出時合並
set hive.merge.mapfiles=true; -- 在map-only job後合並文件,默認true
set hive.merge.mapredfiles=true; -- 在map-reduce job後合並文件,默認false
set hive.merge.size.per.task=256000000; -- 合並後每個文件的大小,默認256000000
set hive.merge.smallfiles.avgsize=16000000; -- 平均文件大小,當結果文件小於該值時,將進行合並
--類型:並發及並發數
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=16;
補充
在寫sql的過程中,有時候會用到排序。但是在hive中用order by排序,將帶來一個問題。只有一個reduce。往往帶來性能的較差。Hive為查詢提供了其他排序方式。
Order by:全局,一個reduce【不建議】
Distribute by :map的輸出排序,排序後,將對應的key通過hash放入到reduce中
Sort by:局部排序,在每個reduce中實現排序。當reduce數只有一個時,sort by和order by結果沒有什麽差異【建議】
Cluster by:分桶中用到,用來指定分桶的字段。cluster by key = distribute by key sort by key desc ,是一個簡寫。cluster by只能降序排序。
原創博客,轉載請註明出處!歡迎郵件溝通:[email protected]
hive優化分享