Hive 調優
HIve調優
1、Fetch抓取機制
我們在剛開始學習hive的時候,都知道hive可以降低程式設計師的學習成本和開發成本,具體表現就在於可以將SQL語句轉換成MapReduce程式執行。但是Hive中對某些情況的查詢可以不必使用MapReduce計算。例如:SELECT * FROM employees,在這種情況下,Hive可以簡單地讀取employee對應的儲存目錄下的檔案,然後輸出查詢結果到控制檯。
在hive-default.xml檔案中hive.fetch.task.conversion預設是more,老版本hive預設是minimal,該屬性修改為more以後,在全域性查詢
把hive.fetch.task.conversion設定成none,所有的程式都走mapreduce程式會耗費一定的時間。但就算設定成more,也只有部分sql語句會不走mapreduce程式,那有沒有什麼辦法可以優化這個問題呢?
-
功能:在執行sql的時候,能不走MapReduce程式處理就儘量不走MapReduce程式處理。
-
儘量直接去操作資料檔案。
-
在下述3種情況下 sql不走mr程式
- --全域性查詢
select * from student;
- --欄位查詢
select num,name from student;
- --limit 查詢
select num,name from student limit 2;
- --全域性查詢
2、mapreduce本地模式
Hive 在叢集上查詢時,預設是在叢集上 N 臺機器上執行, 需要多個機器進行協調執行,這個方式很好地解決了大資料量的查詢問題。但是當 Hive 查詢處理的資料量比較小時,其實沒有必要啟動分散式模式去執行,因為以分散式方式執行就涉及到跨網路傳輸、多節點協調 等,並且消耗資源。mapreduce程式除了可以提交到yarn執行之外,還可以使用本地模擬環境執行,此時就不是分散式執行的程式,對於小資料集,使用本地模式執行時間可以明顯被縮短。
使用者可以通過設定hive.exec.mode.local.auto的值為true,來讓Hive在適當的時候自動啟動這個優化。
-
功能:如果非要執行MapReduce程式,能夠本地執行的,儘量不提交yarn上執行。
-
預設是關閉的。意味著只要走MapReduce就提交yarn執行。
mapreduce.framework.name = local 本地模式 mapreduce.framework.name = yarn 叢集模式
-
Hive提供了一個引數,自動切換MapReduce程式為本地模式,如果不滿足條件,就執行yarn模式。
set hive.exec.mode.local.auto = true; --3個條件必須都滿足 自動切換本地模式 The total input size of the job is lower than: hive.exec.mode.local.auto.inputbytes.max (128MB by default) --資料量小於128M The total number of map-tasks is less than: hive.exec.mode.local.auto.tasks.max (4 by default) --maptask個數少於4個 The total number of reduce tasks required is 1 or 0. --reducetask個數是0 或者 1
3、表的優化
(1)join優化
底層還是MapReduce的join優化
-
Map 端 join
適合於小表join大表或者小表Join小表
Map 端 join是針對以下場景進行的優化:兩個待連線表中,有一個表非常大,而另一個表非常小,以至於小表可以直接存放到記憶體中。這樣,我們可以將小表複製多份,讓每個map task記憶體中存在一份(比如存放到hash table中),然後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查詢是否有相同的key的記錄,如果有,則連線後輸出即可
如果不指定MapJoin或者不符合MapJoin的條件,那麼Hive解析器會將Join操作轉換成Common Join,即:在Reduce階段完成join。容易發生資料傾斜。可以用MapJoin把小表全部載入到記憶體在map端進行join,避免reducer處理。
-
map端join的引數設定:
開啟mapjoin引數設定:
(1)設定自動選擇map join
set hive.auto.convert.join = true;
預設為true(2)大表小表的閾值設定:
set hive.mapjoin.smalltable.filesize= 25000000;
小表的輸入檔案大小的閾值(以位元組為單位);如果檔案大小 小於此閾值,它將嘗試將common join轉換為map join。
因此在實際使用中,只要根據業務把握住小表的閾值標準即可,hive會自動幫我們完成mapjoin,提高執行的效率。
-
-
reduce 端 join
適合於大表Join大表
reduce 端 join是一種最簡單的join方式,其主要思想如下:
在map階段,map函式同時讀取兩個檔案File1和File2,為了區分兩種來源的key/value資料對,對每條資料打一個標籤(tag),比如:tag=0表示來自檔案File1,tag=2表示來自檔案File2。即:map階段的主要任務是對不同檔案中的資料打標籤。
在reduce階段,reduce函式獲取key相同的來自File1和File2檔案的value list, 然後對於同一個key,對File1和File2中的資料進行join(笛卡爾乘積)。即:reduce階段進行實際的連線操作。
Reduce 端 join是非常低效的,因為shuffle階段要進行大量的資料傳輸
-
(1)、空值過濾
有時join超時是因為某些key對應的資料太多,而相同key對應的資料都會發送到相同的reducer上,從而導致記憶體不夠。此時我們應該仔細分析這些異常的key,很多情況下,這些key對應的資料是異常資料,我們需要在SQL語句中進行過濾。例如 key對應的欄位為空的時候
-
對空值進行過濾
SELECT a.* FROM (SELECT * FROM tableA WHERE A.id IS NOT NULL ) a JOIN tableB b ON a.id = b.id;
-
-
(2)、空值轉換
有時雖然某個key為空對應的資料很多,但是相應的資料不是異常資料,必須要包含在join的結果中,此時我們可以表A中key為空的欄位賦一個隨機的值,使得資料隨機均勻地分不到不同的reducer上。
-
如果不給 null賦隨機的值會怎樣?
這樣的後果就是所有為null值的id全部都變成了相同的字串,及其容易造成資料的傾斜(所有的key相同,相同key的資料會到同一個reduce當中去)。
-
如何給null 賦一個隨機的值? Hive中的 rand()函式
SELECT a.* FROM tableA a LEFT JOIN tableB b ON CASE WHEN a.id IS NULL THEN concat('null', rand()) ELSE a.id END = b.id;
-
-
-
Q:如果兩張表都是大表,想走Map join 怎麼辦?怎麼辦?
-
把大表變小
-
在join之前where過濾無關資料
-
還可以分桶 join (bucket join)
-
分桶:表分為多個檔案,相同的欄位分到同一個桶裡
-
Hive採用對列值雜湊,然後除以桶的個數求餘的方式決定該條記錄存放在哪個桶當中。
-
分桶欄位=join 欄位
-
分桶之後還可以進行排序 Sort Merge Bucket Join
-
set hive.optimize.buketmapjoin=true 預設
-
-
-
-
bucket join
適合於大表Join大表
-
方式1:Bucktet Map Join
語法: clustered by colName(參與join的欄位) 引數: set hive.optimize.bucketmapjoin = true 要求: 分桶欄位 = Join欄位 ,分桶的個數相等或者成倍數,必須是在map join中
-
方式2:Sort Merge Bucket Join(SMB)
基於有序的資料Join 語法:clustered by colName sorted by (colName) 引數 set hive.optimize.bucketmapjoin = true; set hive.auto.convert.sortmerge.join=true; set hive.optimize.bucketmapjoin.sortedmerge = true; set hive.auto.convert.sortmerge.join.noconditionaltask=true; 要求: 分桶欄位 = Join欄位 = 排序欄位,分桶的個數相等或者成倍數
-
4、資料傾斜優化
-
group by資料傾斜
預設情況下,當進行group by的時候,Map階段同一Key資料分發給一個reduce,當一個key資料過大時就傾斜了。但並不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端進行部分聚合,最後在Reduce端得出最終結果。
-
方案一:開啟Map端聚合
- 是否在Hive Group By 查詢中使用map端聚合。
hive.map.aggr=true;
這個設定可以將頂層的部分聚合操作放在Map階段執行,從而減輕清洗階段資料傳輸和Reduce階段的執行時間,提升總體效能。但是指標不治本。
- 是否在Hive Group By 查詢中使用map端聚合。
-
方案二:實現隨機分割槽
實現隨機分割槽 select * from table distribute by rand();
-
方案三:資料傾斜時自動負載均衡
hive.groupby.skewindata=true; #開啟該引數以後,當前程式會自動通過兩個MapReduce來執行 #第一個MapReduce自動進行隨機分佈到Reducer中,每個Reducer做部分聚合操作,輸出結果 #第二個MapReduce將上一步聚合的結果再按照業務(group by key)進行處理,保證相同的分佈到一起,最終聚合得到結果
-
-
join資料傾斜
-
方案一:提前過濾,將大資料變成小資料,實現Map Join
-
方案二:使用Bucket Join
-
方案三:使用Skew Join
#將Map Join和Reduce Join進行合併,如果某個值出現了資料傾斜,就會將產生資料傾斜的資料單獨使用Map Join來實現 #其他沒有產生資料傾斜的資料由Reduce Join來實現,這樣就避免了Reduce Join中產生資料傾斜的問題 #最終將Map Join的結果和Reduce Join的結果進行Union合併 #開啟執行過程中skewjoin set hive.optimize.skewjoin=true; #如果這個key的出現的次數超過這個範圍 set hive.skewjoin.key=100000; #在編譯時判斷是否會產生資料傾斜 set hive.optimize.skewjoin.compiletime=true; set hive.optimize.union.remove=true; #如果Hive的底層走的是MapReduce,必須開啟這個屬性,才能實現不合並 set mapreduce.input.fileinputformat.input.dir.recursive=true;
-
5、 MapReduce引擎並行度調整
-
maptask個數
通常情況下,作業會通過input的目錄產生一個或者多個map任務。主要的決定因素有:input的檔案總個數,input的檔案大小,叢集設定的檔案塊大小。
-
如果是在MapReduce中 maptask個數是通過邏輯切片機制決定的。
-
但是在hive中,影響的因素很多。比如邏輯切片機制,檔案是否壓縮、壓縮之後是否支援切割。
-
因此在Hive中,調整MapTask的個數,直接去HDFS調整檔案的大小和個數,效率較高。
-
如果小檔案多,就進行小檔案的合併 ,合併的大小最好=block size
-
如果大檔案多,就調整block size
當input的檔案都很大,任務邏輯複雜,map執行非常慢的時候,可以考慮增加Map數,來使得每個map處理的資料量減少,從而提高任務的執行效率。
增加map的方法為:
1、
computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
調整maxSize最大值。讓maxSize最大值低於blocksize就可以增加map的個數。2、如果表a只有一個檔案,大小為120M,但包含幾千萬的記錄,如果用1個map去完成這個任務,肯定是比較耗時的,這種情況下,我們要考慮將這一個檔案合理的拆分成多個,這樣就可以用多個map任務去完成。
set mapreduce.job.reduces =10; create table a_1 as select * from a distribute by rand(1222);
這樣會將a表的記錄,隨機的分散到包含10個檔案的a_1表中,再用a_1代替上面sql中的a表,則會用10個map任務去完成。每個map任務處理大於12M(幾百萬記錄)的資料,效率肯定會好很多。
-
-
-
reducetask個數
reduce個數並不是越多越好
1)過多的啟動和初始化reduce也會消耗時間和資源;
2)另外,有多少個reduce,就會有多少個輸出檔案,如果生成了很多個小檔案,那麼如果這些小檔案作為下一個任務的輸入,則也會出現小檔案過多的問題;
在設定reduce個數的時候也需要考慮這兩個原則:處理大資料量利用合適的reduce數;使單個reduce任務處理資料量大小要合適;
-
如果在MapReduce中,通過程式碼可以直接指定 job.setNumReduceTasks(N)
-
在Hive中,reducetask個數受以下幾個條件控制的
(1)每個 Reduce 處理的資料量預設是 256MB hive.exec.reducers.bytes.per.reducer=256000000 (2)每個任務最大的 reduce 數,預設為 1009 hive.exec.reducsers.max=1009 (3)mapreduce.job.reduces 該值預設為-1,由 hive 自己根據任務情況進行判斷。 --如果使用者使用者不設定 hive將會根據資料量或者sql需求自己評估reducetask個數。 --使用者可以自己通過引數設定reducetask的個數 set mapreduce.job.reduces = N --使用者設定的不一定生效,如果使用者設定的和sql執行邏輯有衝突,比如order by,在sql編譯期間,hive又會將reducetask設定為合理的個數。 Number of reduce tasks determined at compile time: 1
-
6、並行執行機制
Hive會將一個查詢轉化成一個或者多個階段。這樣的階段可以是MapReduce階段、抽樣階段、合併階段、limit階段。或者Hive執行過程中可能需要的其他階段。
預設情況下,Hive一次只會執行一個階段。不過,某個特定的job可能包含眾多的階段,而這些階段可能並非完全互相依賴的,也就是說有些階段是可以並行執行的,這樣可能使得整個job的執行時間縮短。不過,如果有更多的階段可以並行執行,那麼job可能就越快完成。
通過設定引數hive.exec.parallel值為true,就可以開啟併發執行。不過,在共享叢集中,需要注意下,如果job中並行階段增多,那麼叢集利用率就會增加。
-
前提是stage之間沒有依賴 並行的弊端是瞬時伺服器壓力變大。
-
引數
set hive.exec.parallel=true; --是否並行執行作業。適用於可以並行執行的 MapReduce 作業,例如在多次插入期間移動檔案以插入目標 set hive.exec.parallel.thread.number=16; --最多可以並行執行多少個作業。預設為8。
7、推測執行機制
-
推測執行機制
在分散式叢集環境下,因為程式Bug(包括Hadoop本身的bug),負載不均衡或者資源分佈不均等原因,會造成同一個作業的多個任務之間執行速度不一致,有些任務的執行速度可能明顯慢於其他任務(比如一個作業的某個任務進度只有50%,而其他所有任務已經執行完畢),則這些任務會拖慢作業的整體執行進度。為了避免這種情況發生,Hadoop採用了推測執行(Speculative Execution)機制,它根據一定的法則推測出“拖後腿”的任務,併為這樣的任務啟動一個備份任務,讓該任務與原始任務同時處理同一份資料,並最終選用最先成功執行完成任務的計算結果作為最終結果。
hadoop中預設兩個階段都開啟了推測執行機制。
- MapReduce中task的一個機制。
- 功能:
- 一個job底層可能有多個task執行,如果某些拖後腿的task執行慢,可能會導致最終job失敗。
- 所謂的推測執行機制就是通過演算法找出拖後腿的task,為其啟動備份的task
- 兩個task同時處理一份資料,誰先處理完,誰的結果作為最終結果。
- 推測執行機制預設是開啟的,但是在企業生產環境中建議關閉。
關於調優推測執行機制,還很難給一個具體的建議。如果使用者對於執行時的偏差非常敏感的話,那麼可以將這些功能關閉掉。如果使用者因為輸入資料量很大而需要執行長時間的map或者Reduce task的話,那麼啟動推測執行造成的浪費是非常巨大。
8、嚴格模式
Hive提供了一個嚴格模式,可以防止使用者執行那些可能意向不到的不好的影響的查詢。
- 注意。不要和動態分割槽的嚴格模式搞混淆。
通過設定屬性hive.mapred.mode值為預設是非嚴格模式nonstrict 。開啟嚴格模式需要修改hive.mapred.mode值為strict,開啟嚴格模式可以禁止3種類型的查詢。
1)對於分割槽表,除非where語句中含有分割槽欄位過濾條件來限制範圍,否則不允許執行。換句話說,就是使用者不允許掃描所有分割槽。進行這個限制的原因是,通常分割槽表都擁有非常大的資料集,而且資料增加迅速。沒有進行分割槽限制的查詢可能會消耗令人不可接受的巨大資源來處理這個表。
2)對於使用了order by語句的查詢,要求必須使用limit語句。因為order by為了執行排序過程會將所有的結果資料分發到同一個Reducer中進行處理,強制要求使用者增加這個LIMIT語句可以防止Reducer額外執行很長一段時間。
3)限制笛卡爾積的查詢。對關係型資料庫非常瞭解的使用者可能期望在執行JOIN查詢的時候不使用ON語句而是使用where語句,這樣關係資料庫的執行優化器就可以高效地將WHERE語句轉化成那個ON語句。不幸的是,Hive並不會執行這種優化,因此,如果表足夠大,那麼這個查詢就會出現不可控的情況。
9、笛卡爾積
儘量避免笛卡爾積,即避免join的時候不加on條件,或者無效的on條件,Hive只能使用1個reducer來完成笛卡爾積。
10、使用分割槽剪裁、列剪裁
-
在SELECT中,只拿需要的列,如果有,儘量使用分割槽過濾,少用SELECT * 。
-
在分割槽剪裁中,當使用外關聯時,如果將副表的過濾條件寫在Where後面,那麼就會先全表關聯,之後再過濾。
-
先關聯再Where:
select a.id from tableA left join tableB b on a.id=b.id where b.id<=10
-
正確的寫法是:先Where再關聯
select a.id from tableA left join tableB b on (b.id <= 10 and a.id=b.id)
-
11、動態分割槽調整
關係型資料庫中,對分割槽表Insert資料時候,資料庫自動會根據分割槽欄位的值,將資料插入到相應的分割槽中,Hive中也提供了類似的機制,即動態分割槽(Dynamic Partition),只不過,使用Hive的動態分割槽,需要進行相應的配置。
1)開啟動態分割槽引數設定
-
開啟動態分割槽功能(預設true,開啟)
hive.exec.dynamic.partition=true
-
設定為非嚴格模式(動態分割槽的模式,預設strict,表示必須指定至少一個分割槽為靜態分割槽,nonstrict模式表示允許所有的分割槽欄位都可以使用動態分割槽。)
hive.exec.dynamic.partition.mode=nonstrict
-
(3)在所有執行MR的節點上,最大一共可以建立多少個動態分割槽。
hive.exec.max.dynamic.partitions=1000
-
(4)在每個執行MR的節點上,最大可以建立多少個動態分割槽。該引數需要根據實際的資料來設定。比如:源資料中包含了一年的資料,即day欄位有365個值,那麼該引數就需要設定成大於365,如果使用預設值100,則會報錯。
hive.exec.max.dynamic.partitions.pernode=100
-
(5)整個MR Job中,最大可以建立多少個HDFS檔案。
hive.exec.max.created.files=100000
-
(6)當有空分割槽生成時,是否丟擲異常。一般不需要設定。
hive.error.on.empty.partition=false