1. 程式人生 > >hive 調優總結

hive 調優總結

hive調優是比較大的專題,需要結合實際的業務,資料的型別,分佈,質量狀況等來實際的考慮如何進行系統性的優化,hive底層是mapreduce,所以hadoop調優也是hive調優的一個基礎,hvie調優可以分為幾個模組進行考慮,資料的壓縮與儲存,sql的優化,hive引數的優化,解決資料的傾斜等。

一,資料的壓縮與儲存格式

對分析的資料選擇合適的儲存格式與壓縮方式能提高hive的分析效率:

1)壓縮方式

      壓縮可以節約磁碟的空間,基於文字的壓縮率可達40%+; 壓縮可以增加吞吐量和效能量(減小載入記憶體的資料量),但是在壓縮和解壓過程中會增加CPU的開銷。所以針對IO密集型的jobs(非計算密集型)可以使用壓縮的方式提高效能。 幾種壓縮演算法:

注意:選擇壓縮演算法的時候需要考慮到是否可以分割,如果不支援分割(切片的時候需要確定一條資料的完整性),則一個map需要執行完一個檔案,如果檔案很大,則效率很低。一般情況下hdfs一個塊(128M)就是一個map的輸入切片,而block是按物理切割的,可能一條資料會被切到兩個塊中去,而mapde 切片如何確保一條資料在一個切片中呢?這就是看壓縮演算法支不支援分割了,具體的實現機制需要看原始碼研究。

2)儲存格式(行存與列存)

1. TextFile

Hive資料表的預設格式,儲存方式:行儲存。 可以使用Gzip壓縮演算法,但壓縮後的檔案不支援split 在反序列化過程中,必須逐個字元判斷是不是分隔符和行結束符,因此反序列化開銷會比SequenceFile高几十倍。

2.Sequence Files

Hadoop中有些原生壓縮檔案的缺點之一就是不支援分割。支援分割的檔案可以並行的有多個mapper程式處理大資料檔案,大多數檔案不支援可分割是因為這些檔案只能從頭開始讀。Sequence File是可分割的檔案格式,支援Hadoop的block級壓縮。 Hadoop API提供的一種二進位制檔案,以key-value的形式序列化到檔案中。儲存方式:行儲存。 sequencefile支援三種壓縮選擇:NONE,RECORD,BLOCK。Record壓縮率低,RECORD是預設選項,通常BLOCK會帶來較RECORD更好的壓縮效能。 優勢是檔案和hadoop api中的MapFile是相互相容的

3. RCFile

儲存方式:資料按行分塊,每塊按列儲存。結合了行儲存和列儲存的優點:

首先,RCFile 保證同一行的資料位於同一節點,因此元組重構的開銷很低 其次,像列儲存一樣,RCFile 能夠利用列維度的資料壓縮,並且能跳過不必要的列讀取 資料追加:RCFile不支援任意方式的資料寫操作,僅提供一種追加介面,這是因為底層的 HDFS當前僅僅支援資料追加寫檔案尾部。 行組大小:行組變大有助於提高資料壓縮的效率,但是可能會損害資料的讀取效能,因為這樣增加了 Lazy 解壓效能的消耗。而且行組變大會佔用更多的記憶體,這會影響併發執行的其他MR作業。

4.ORCFile

儲存方式:資料按行分塊,每塊按照列儲存。 壓縮快,快速列存取。效率比rcfile高,是rcfile的改良版本。

5.Parquet

Parquet也是一種行式儲存,同時具有很好的壓縮效能;同時可以減少大量的表掃描和反序列化的時間

6、自定義格式

可以自定義檔案格式,使用者可通過實現InputFormat和OutputFormat來自定義輸入輸出格式。

壓縮與儲存格式的選擇

mapreduce可以選擇壓縮的地方:map階段的輸出和reduce階段的輸出。

設定方式:

1. map階段輸出資料壓縮 ,在這個階段,優先選擇一個低CPU開銷的演算法。

set hive.exec.compress.intermediate=true
set mapred.map.output.compression.codec= org.apache.hadoop.io.compress.SnappyCodec
set mapred.map.output.compression.codec=com.hadoop.compression.lzo.LzoCodec;

2.hive.exec.compress.output:使用者可以對最終生成的Hive表的資料通常也需要壓縮。該引數控制這一功能的啟用與禁用,設定為true來宣告將結果檔案進行壓縮。 (也可以在建表的時候進行設定)

set hive.exec.compress.output=true 
set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec

結論,一般選擇orcfile/parquet + snappy 的方式

建表語句:

create table tablename (
 xxx,string
 xxx, bigint
)
ROW FORMAT DELTMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties("orc.compress" = "SNAPPY")

二、建立分割槽表,桶表,拆分表

1)建立分割槽表:(分割槽表相當於hive的索引,加快查詢速度)

CREATE external TABLE table_name    
(col1 string,  col2 double) 
partitioned by (date string)  
 ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' Stored AS TEXTFILE
location ‘xxxxx’;

alter table table_name add partitions(key = value) location 'xxxx' (收到設定分割槽,靜態分割槽)

設定動態分割槽

set hive.exec.dynamic.partition=true;(可通過這個語句檢視:set hive.exec.dynamic.partition;) 
set hive.exec.dynamic.partition.mode=nonstrict; (它的預設值是strick,即不允許分割槽列全部是動態的)
SET hive.exec.max.dynamic.partitions=100000;(如果自動分割槽數大於這個引數,將會報錯)
SET hive.exec.max.dynamic.partitions.pernode=100000;

2)建立桶表:

對於每一個表(table)或者分割槽, Hive可以進一步組織成桶,也就是說桶是更為細粒度的資料範圍劃分。Hive也是 針對某一列進行桶的組織。Hive採用對列值雜湊,然後除以桶的個數求餘的方式決定該條記錄存放在哪個桶當中。

把表(或者分割槽)組織成桶(Bucket)有兩個理由:

(1)獲得更高的查詢處理效率。桶為表加上了額外的結構,Hive 在處理有些查詢時能利用這個結構。具體而言,連線兩個在(包含連線列的)相同列上劃分了桶的表,可以使用 Map 端連線 (Map-side join)高效的實現。比如JOIN操作。對於JOIN操作兩個表有一個相同的列,如果對這兩個表都進行了桶操作。那麼將儲存相同列值的桶進行JOIN操作就可以,可以大大較少JOIN的資料量。

(2)使取樣(sampling)更高效。在處理大規模資料集時,在開發和修改查詢的階段,如果能在資料集的一小部分資料上試執行查詢,會帶來很多方便。

create table bucketed_user(id int,name string) clustered by (id)
 sorted by(name) into 4 buckets row format delimited fields terminated by '\t'
 stored as textfile;

3)拆分表:當你需要對一個很大的表做分析的時候,但不是每個欄位都需要用到,可以考慮拆分表,生成子表,減少輸入的資料量。並且過濾掉無效的資料,或者合併資料,進一步減少分析的資料量

create table tablename 
ROW FORMAT DELTMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties("orc.compress" = "SNAPPY")
as select XXX from XXXX

三、hive引數優化

1)fetch task 為執行hive時,不用執行MapReduce,如select * from emp;

Hive.fetch.task.conversion 預設為minimal

修改配置檔案hive-site.xml
<property>
  <name>hive.fetch.task.conversion</name>
  <value>more</value>
  <description>
    Some select queries can be converted to single FETCH task 
    minimizing latency.Currently the query should be single 
    sourced not having any subquery and should not have
    any aggregations or distincts (which incurrs RS), 
    lateral views and joins.
    1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
    2. more    : SELECT, FILTER, LIMIT only (+TABLESAMPLE, virtual columns)
  </description>
</property> 


或者當前session修改
hive> set hive.fetch.task.conversion=more;
執行SELECT id, money FROM m limit 10; 不走mr

2)並行執行

當一個sql中有多個job時候,且這多個job之間沒有依賴,則可以讓順序執行變為並行執行(一般為用到union all )

// 開啟任務並行執行
 set hive.exec.parallel=true;
 // 同一個sql允許並行任務的最大執行緒數 
set hive.exec.parallel.thread.number=8;

3)jvm 重用

 JVM重用對hive的效能具有非常大的 影響,特別是對於很難避免小檔案的場景或者task特別多的場景,這類場景大多數執行時間都很短。jvm的啟動過程可能會造成相當大的開銷,尤其是執行的job包含有成千上萬個task任務的情況。

set mapred.job.reuse.jvm.num.tasks=10; 

JVM的一個缺點是,開啟JVM重用將會一直佔用使用到的task插槽,以便進行重用,直到任務完成後才能釋放。如果某個“不平衡“的job中有幾個 reduce task 執行的時間要比其他reduce task消耗的時間多得多的話,那麼保留的插槽就會一直空閒著卻無法被其他的job使用,直到所有的task都結束了才會釋放。

4)設定reduce的數目

reduce個數的設定極大影響任務執行效率,不指定reduce個數的情況下,Hive會猜測確定一個reduce個數,基於以下兩個設定: hive.exec.reducers.bytes.per.reducer(每個reduce任務處理的資料量,在Hive 0.14.0版本之前預設值是1G(1,000,000,000);而從Hive 0.14.0開始,預設值變成了256M(256,000,000) ) hive.exec.reducers.max(每個任務最大的reduce數,在Hive 0.14.0版本之前預設值是999;而從Hive 0.14.0開始,預設值變成了1009 ) 計算reducer數的公式很簡單N=min(引數2,總輸入資料量/引數1) 即,如果reduce的輸入(map的輸出)總大小不超過1G,那麼只會有一個reduce任務;

  調整reduce個數方法一: 調整hive.exec.reducers.bytes.per.reducer引數的值;

set hive.exec.reducers.bytes.per.reducer=500000000; (500M)

  調整reduce個數方法二;

set mapred.reduce.tasks = number

  reduce個數並不是越多越好; 同map一樣,啟動和初始化reduce也會消耗時間和資源; 另外,有多少個reduce,就會有多少個輸出檔案,如果生成了很多個小檔案,那麼如果這些小檔案作為下一個任務的輸入,則也會出現小檔案過多的問題 -

5) 推測執行

   什麼是推測執行? 所謂的推測執行,就是當所有task都開始執行之後,Job Tracker會統計所有任務的平均進度,如果某個task所在的task node機器配置比較低或者CPU load很高(原因很多),導致任務執行比總體任務的平均執行要慢,此時Job Tracker會啟動一個新的任務(duplicate task),原有任務和新任務哪個先執行完就把另外一個kill掉   怎麼配置推測執行引數? 推測執行需要設定Job的兩個引數:     mapred.map.tasks.speculative.execution     mapred.reduce.tasks.speculative.execution 兩個引數的預設值均為true.

四、優化sql

(1)where條件優化 優化前(關係資料庫不用考慮會自動優化): select m.cid,u.id from order m join customer u on( m.cid =u.id )where m.dt='20180808'; 優化後(where條件在map端執行而不是在reduce端執行): select m.cid,u.id from (select * from order where dt='20180818') m join customer u on( m.cid =u.id);

(2)union優化

儘量不要使用union (union 去掉重複的記錄)而是使用 union all 然後在用group by 去重

(3)count distinct優化

不要使用count (distinct   cloumn) ,使用子查詢

select count(1) from (select id from tablename group by id) tmp;

(4) 用in 來代替join

如果需要根據一個表的欄位來約束另為一個表,儘量用in來代替join .

select id,name from tb1  a join tb2 b on(a.id = b.id);

select id,name from tb1 where id in(select id from tb2); in 要比join 快

(5)消滅子查詢內的 group by 、 COUNT(DISTINCT),MAX,MIN。 可以減少job的數量。

  (6) join 優化:

   Common/shuffle/Reduce JOIN 連線發生的階段,發生在reduce 階段, 適用於大表 連線 大表(預設的方式)

    Map join : 連線發生在map階段 , 適用於小表 連線 大表                        大表的資料從檔案中讀取                        小表的資料存放在記憶體中(hive中已經自動進行了優化,自動判斷小表,然後進行快取)

set hive.auto.convert.join=true;  

   SMB join    Sort -Merge -Bucket Join  對大表連線大表的優化,用桶表的概念來進行優化。在一個桶內傳送笛卡爾積連線(需要是兩個統計進行join)

set hive.auto.convert.sortmerge.join=true;  
set hive.optimize.bucketmapjoin = true;  
set hive.optimize.bucketmapjoin.sortedmerge = true;  
set hive.auto.convert.sortmerge.join.noconditionaltask=true;  

五、資料傾斜

表現:任務進度長時間維持在99%(或100%),檢視任務監控頁面,發現只有少量(1個或幾個)reduce子任務未完成。因為其處理的資料量和其他reduce差異過大。

原因:某個reduce的資料輸入量遠遠大於其他reduce資料的輸入量

1)、key分佈不均勻

2)、業務資料本身的特性

3)、建表時考慮不周

4)、某些SQL語句本身就有資料傾斜

關鍵詞 情形 後果
join 其中一個表較小,但是key集中 分發到某一個或幾個Reduce上的資料遠高於平均值
join 大表與大表,但是分桶的判斷欄位0值或空值過多 這些空值都由一個reduce處理,非常慢
group by group by 維度過小,某值的數量過多 處理某值的reduce非常耗時
count distinct 某特殊值過多 處理此特殊值reduce耗時

解決方案:

(1)引數調節

set hive.map.aggr=true

set hive.groupby.skewindata=true

(2) 熟悉資料的分佈,優化sql的邏輯,找出資料傾斜的原因。

六、合併小檔案

小檔案的產生有三個地方,map輸入,map輸出,reduce輸出,小檔案過多也會影響hive的分析效率:

設定map輸入的小檔案合併

set mapred.max.split.size=256000000;  
//一個節點上split的至少的大小(這個值決定了多個DataNode上的檔案是否需要合併)
set mapred.min.split.size.per.node=100000000;
//一個交換機下split的至少的大小(這個值決定了多個交換機上的檔案是否需要合併)  
set mapred.min.split.size.per.rack=100000000;
//執行Map前進行小檔案合併
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 

設定map輸出和reduce輸出進行合併的相關引數:

//設定map端輸出進行合併,預設為true
set hive.merge.mapfiles = true
//設定reduce端輸出進行合併,預設為false
set hive.merge.mapredfiles = true
//設定合併檔案的大小
set hive.merge.size.per.task = 256*1000*1000
//當輸出檔案的平均大小小於該值時,啟動一個獨立的MapReduce任務進行檔案merge。
set hive.merge.smallfiles.avgsize=16000000

七、檢視sql的執行計劃

explain sql 

學會檢視sql的執行計劃,優化業務邏輯 ,減少job的資料量。

以上為我在工作中的經驗和網上查閱資料所整理出來的hive調優,後面會繼續補充