1. 程式人生 > 實用技巧 >Hadoop基礎(五十一):企業級調優(一)

Hadoop基礎(五十一):企業級調優(一)

1 Fetch 抓取

Fetch 抓取是指,Hive 中對某些情況的查詢可以不必使用 MapReduce 計算。例如: SELECT * FROM employees;在這種情況下,Hive 可以簡單地讀取 employee 對應的儲存目錄下的檔案,然後輸出查詢結果到控制檯。 在 hive-default.xml.template 檔案中 hive.fetch.task.conversion 預設是 more,老版本 hive預設是 minimal,該屬性修改為 more 以後,在全域性查詢、欄位查詢、limit 查詢等都不走mapreduce。
<property>
 <name>hive.fetch.task.conversion</name>
 <value>more</value>
 <description>
 Expects one of [none, minimal, more].
 Some 
select queries can be converted to single FETCH task minimizing latency. Currently the query should be single sourced not having any subquery and should not have any aggregations or distincts (which incurs RS), lateral views and joins. 0. none : disable hive.fetch.task.conversion 1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns) </description> </property>
案例實操: 1)把 hive.fetch.task.conversion 設定成 none,然後執行查詢語句,都會執行 mapreduce程式。
hive (default)> set hive.fetch.task.conversion=none;
hive (default)> select * from emp;
hive (default
)> select ename from emp; hive (default)> select ename from emp limit 3;
2)把 hive.fetch.task.conversion 設定成 more,然後執行查詢語句,如下查詢方式都不會執行 mapreduce 程式。
hive (default)> set hive.fetch.task.conversion=more;
hive (default)> select * from emp;
hive (default)> select ename from emp;
hive (default)> select ename from emp limit 3;

2 本地模式

大多數的 Hadoop Job 是需要 Hadoop 提供的完整的可擴充套件性來處理大資料集的。不過,有時 Hive 的輸入資料量是非常小的。在這種情況下,為查詢觸發執行任務消耗的時間可能會比實際 job 的執行時間要多的多。對於大多數這種情況,Hive 可以通過本地模式在單臺機器上處理所有的任務。對於小資料集,執行時間可以明顯被縮短 使用者可以通過設定 hive.exec.mode.local.auto 的值為 true,來讓 Hive 在適當的時候自動啟動這個優化,預設是 false。
set hive.exec.mode.local.auto=true; //開啟本地 mr
//設定 local mr 的最大輸入資料量,當輸入資料量小於這個值時採用 local mr 的方式,預設134217728,即 128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
//設定 local mr 的最大輸入檔案個數,當輸入檔案個數小於這個值時採用 local mr 的方式,默
認為 4
set hive.exec.mode.local.auto.input.files.max=10;
案例實操: 1)開啟本地模式,並執行查詢語句
hive (default)> set hive.exec.mode.local.auto=true;
hive (default)> select * from emp cluster by deptno;
Time taken: 1.328 seconds, Fetched: 14 row(s)
2)關閉本地模式,並執行查詢語句
hive (default)> set hive.exec.mode.local.auto=false;
hive (default)> select * from emp cluster by deptno;
Time taken: 20.09 seconds, Fetched: 14 row(s)

3 表的優化

3.1 小表、大表 Join 將 key 相對分散,並且資料量小的表放在 join 的左邊,這樣可以有效減少記憶體溢位錯誤發生的機率;再進一步,可以使用 map join 讓小的維度表(1000 條以下的記錄條數)先進記憶體。在 map 端完成 reduce。 實際測試發現:新版的 hive 已經對小表 JOIN 大表和大表 JOIN 小表進行了優化。小表放在左邊和右邊已經沒有明顯區別。 案例實操 1.需求 測試大表 JOIN 小表和小表 JOIN 大表的效率 2.建大表、小表和 JOIN 後表的語句
// 建立大表
create table bigtable(id bigint, time bigint, uid string, keyword string, 
url_rank int, click_num int, click_url string) row format delimited fields 
terminated by '\t';
// 建立小表
create table smalltable(id bigint, time bigint, uid string, keyword 
string, url_rank int, click_num int, click_url string) row format delimited 
fields terminated by '\t';
// 建立 join 後表的語句
create table jointable(id bigint, time bigint, uid string, keyword 
string, url_rank int, click_num int, click_url string) row format delimited 
fields terminated by '\t';
3.分別向大表和小表中匯入資料
hive (default)> load data local inpath '/opt/module/datas/bigtable' into table 
bigtable;
hive (default)>load data local inpath '/opt/module/datas/smalltable' into table 
smalltable;
4.關閉 mapjoin 功能(預設是開啟的)
set hive.auto.convert.join = false;
5.執行小表 JOIN 大表語句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
left join bigtable b
on b.id = s.id;
Time taken: 35.921 seconds No rows affected (44.456 seconds) 6.執行大表 JOIN 小表語句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable b
left join smalltable s
on s.id = b.id;
Time taken: 34.196 seconds No rows affected (26.287 seconds) 3.2 大表 Join 大表 1.空 KEY 過濾 有時 join 超時是因為某些 key 對應的資料太多,而相同 key 對應的資料都會發送到相同的 reducer 上,從而導致記憶體不夠。此時我們應該仔細分析這些異常的 key,很多情況 下,這些 key 對應的資料是異常資料,我們需要在 SQL 語句中進行過濾。例如 key 對應的欄位為空,操作如下: 案例實操 (1)配置歷史伺服器 配置 mapred-site.xml
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop102:10020</value>
</property>
<property>
 <name>mapreduce.jobhistory.webapp.address</name>
 <value>hadoop102:19888</value>
</property>
啟動歷史伺服器 sbin/mr-jobhistory-daemon.sh start historyserver 檢視 jobhistory http://hadoop102:19888/jobhistory (2)建立原始資料表、空 id 表、合併後資料表
// 建立原始表
create table ori(id bigint, time bigint, uid string, keyword string, url_rank 
int, click_num int, click_url string) row format delimited fields terminated by 
'\t';
// 建立空 id 表
create table nullidtable(id bigint, time bigint, uid string, keyword string,
url_rank int, click_num int, click_url string) row format delimited fields 
terminated by '\t';
// 建立 join 後表的語句
create table jointable(id bigint, time bigint, uid string, keyword string, 
url_rank int, click_num int, click_url string) row format delimited fields 
terminated by '\t';
(3)分別載入原始資料和空 id 資料到對應表中
hive (default)> load data local inpath '/opt/module/datas/ori' into table ori;
hive (default)> load data local inpath '/opt/module/datas/nullid' into tablenullidtable; (4)測試不過濾空 id hive (default)> insert overwrite table jointable select n.* from nullidtable n left join ori o on n.id = o.id; Time taken: 42.038 seconds Time taken: 37.284 seconds (5)測試過濾空 id
hive (default)> insert overwrite table jointable select n.* from (select * from
nullidtable where id is not null ) n left join ori o on n.id = o.id;
Time taken: 31.725 seconds Time taken: 28.876 seconds 2.空 key 轉換 有時雖然某個 key 為空對應的資料很多,但是相應的資料不是異常資料,必須要包含在join 的結果中,此時我們可以表 a 中 key 為空的欄位賦一個隨機的值,使得資料隨機均勻地 分不到不同的 reducer 上。例如: 案例實操: 不隨機分佈空 null 值: (1)設定 5 個 reduce 個數 set mapreduce.job.reduces = 5; (2)JOIN 兩張表
insert overwrite table jointable
select n.* from nullidtable n left join ori b on n.id = b.id;
結果:如圖 6-13 所示,可以看出來,出現了資料傾斜,某些 reducer 的資源消耗遠大於其他 reducer。 圖 6-13 空 key 轉換 隨機分佈空 null 值 (1)設定 5 個 reduce 個數 set mapreduce.job.reduces = 5; (2)JOIN 兩張表
insert overwrite table jointable
select n.* from nullidtable n full join ori o on 
case when n.id is null then concat('hive', rand()) else n.id end = o.id;
結果:如圖 6-14 所示,可以看出來,消除了資料傾斜,負載均衡 reducer 的資源消耗 3.3 MapJoin(小表 join 大表) 如果不指定 MapJoin 或者不符合 MapJoin 的條件,那麼 Hive 解析器會將 Join 操作轉換成 Common Join,即:在 Reduce 階段完成 join。容易發生資料傾斜。可以用 MapJoin 把小 表全部載入到記憶體在 map 端進行 join,避免 reducer 處理。 1.開啟 MapJoin 引數設定 (1)設定自動選擇 Mapjoin
set hive.auto.convert.join = true; 預設為 true
(2)大表小表的閾值設定(預設 25M 一下認為是小表):
set hive.mapjoin.smalltable.filesize=25000000;
2.MapJoin 工作機制,如圖 6-15 所示 圖 6-15 MapJoin 工作機制 案例實操: (1)開啟 Mapjoin 功能
set hive.auto.convert.join = true; 預設為 true
(2)執行小表 JOIN 大表語句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
join bigtable b
on s.id = b.id;
Time taken: 24.594 seconds (3)執行大表 JOIN 小表語句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable b
join smalltable s
on s.id = b.id;
Time taken: 24.315 seconds 3.4 Group By 預設情況下,Map 階段同一 Key 資料分發給一個 reduce,當一個 key 資料過大時就傾斜了。

並不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端進行部分聚合,最後在 Reduce 端得出最終結果。 1.開啟 Map 端聚合引數設定 (1)是否在 Map 端進行聚合,預設為 True
set hive.map.aggr = true
(2)在 Map 端進行聚合操作的條目數目
set hive.groupby.mapaggr.checkinterval = 100000
(3)有資料傾斜的時候進行負載均衡(預設是 false)
set hive.groupby.skewindata = true
當選項設定為 true,生成的查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果會隨機分佈到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結 果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的資料結果按照 Group By Key 分佈到 Reduce 中(這個過程可以保證相同的 Group By Key 被分佈到同一個 Reduce 中),最後完成最終的聚合操作。
hive (default)> select deptno from emp group by deptno;
Stage-Stage-1: Map: 1 Reduce: 5 Cumulative CPU: 23.68 sec HDFS Read: 19987 
HDFS Write: 9 SUCCESS
Total MapReduce CPU Time Spent: 23 seconds 680 msec
OK
deptno
10
20
30
優化以後
hive (default)> set hive.groupby.skewindata = true;
hive (default)> select deptno from emp group by deptno;
Stage-Stage-1: Map: 1 Reduce: 5 Cumulative CPU: 28.53 sec HDFS Read: 18209 
HDFS Write: 534 SUCCESS
Stage-Stage-2: Map: 1 Reduce: 5 Cumulative CPU: 38.32 sec HDFS Read: 15014 
HDFS Write: 9 SUCCESS
Total MapReduce CPU Time Spent: 1 minutes 6 seconds 850 msec
OK
deptno
10
20
30
3.5 Count(Distinct) 去重統計 資料量小的時候無所謂,資料量大的情況下,由於 COUNT DISTINCT 的全聚合操作,即使設定了 reduce task 個數,set mapred.reduce.tasks=100;hive 也只會啟動一個 reducer。, 這就造成一個 Reduce 處理的資料量太大,導致整個 Job 很難完成,一般 COUNT DISTINCT使用先 GROUP BY 再 COUNT 的方式替換: 案例實操 1.建立一張大表
hive (default)> create table bigtable(id bigint, time bigint, uid string, keyword
string, url_rank int, click_num int, click_url string) row format delimited
fields terminated by '\t';
2.載入資料
hive (default)> load data local inpath '/opt/module/datas/bigtable' into table
bigtable;
3.設定 5 個 reduce 個數
set mapreduce.job.reduces = 5;
4.執行去重 id 查詢
hive (default)> select count(distinct id) from bigtable;
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 7.12 sec HDFS Read: 120741990 
HDFS Write: 7 SUCCESS
Total MapReduce CPU Time Spent: 7 seconds 120 msec
OK
c0
100001
Time taken: 23.607 seconds, Fetched: 1 row(s)
5.採用 GROUP by 去重 id
hive (default)> select count(id) from (select id from bigtable group by id) a;
Stage-Stage-1: Map: 1 Reduce: 5 Cumulative CPU: 17.53 sec HDFS Read: 120752703 
HDFS Write: 580 SUCCESS
Stage-Stage-2: Map: 1 Reduce: 1 Cumulative CPU: 4.29 sec HDFS Read: 9409 HDFS 
Write: 7 SUCCESS
Total MapReduce CPU Time Spent: 21 seconds 820 msec
OK
_c0
100001
Time taken: 50.795 seconds, Fetched: 1 row(s)
雖然會多用一個 Job 來完成,但在資料量大的情況下,這個絕對是值得的。 3.6 笛卡爾積 儘量避免笛卡爾積,join 的時候不加 on 條件,或者無效的 on 條件,Hive 只能使用 1 個reducer 來完成笛卡爾積。 3.7 行列過濾 列處理:在 SELECT 中,只拿需要的列,如果有,儘量使用分割槽過濾,少用 SELECT *。行處理:在分割槽剪裁中,當使用外關聯時,如果將副表的過濾條件寫在 Where 後面,那 麼就會先全表關聯,之後再過濾,比如: 案例實操: 1.測試先關聯兩張表,再用 where 條件過濾
hive (default)> select o.id from bigtable b
join ori o on o.id = b.id
where o.id <= 10;
Time taken: 34.406 seconds, Fetched: 100 row(s) 2.通過子查詢後,再關聯表
hive (default)> select b.id from bigtable b
join (select id from ori where id <= 10 ) o on b.id = o.id;
Time taken: 30.058 seconds, Fetched: 100 row(s) 3.8 動態分割槽調整 關係型資料庫中,對分割槽表 Insert 資料時候,資料庫自動會根據分割槽欄位的值,將資料插入到相應的分割槽中,Hive 中也提供了類似的機制,即動態分割槽(Dynamic Partition),只不 過,使用 Hive 的動態分割槽,需要進行相應的配置。 1.開啟動態分割槽引數設定 (1)開啟動態分割槽功能(預設 true,開啟) hive.exec.dynamic.partition=true (2)設定為非嚴格模式(動態分割槽的模式,預設 strict,表示必須指定至少一個分割槽為靜態分割槽,nonstrict 模式表示允許所有的分割槽欄位都可以使用動態分割槽。)
hive.exec.dynamic.partition.mode=nonstrict
(3)在所有執行 MR 的節點上,最大一共可以建立多少個動態分割槽。預設 1000
hive.exec.max.dynamic.partitions=1000
(4)在每個執行 MR 的節點上,最大可以建立多少個動態分割槽。該引數需要根據實際的資料來設定。比如:源資料中包含了一年的資料,即 day 欄位有 365 個值,那麼該引數就 需要設定成大於 365,如果使用預設值 100,則會報錯。
hive.exec.max.dynamic.partitions.pernode=100
(5)整個 MR Job 中,最大可以建立多少個 HDFS 檔案。預設 100000
hive.exec.max.created.files=100000
(6)當有空分割槽生成時,是否丟擲異常。一般不需要設定。預設 false
hive.error.on.empty.partition=false
2.案例實操 需求:將 dept 表中的資料按照地區(loc 欄位),插入到目標表 dept_partition 的相應分割槽中。 (1)建立目標分割槽表
hive (default)> create table dept_partition(id int, name string) partitioned
by (location int) row format delimited fields terminated by '\t';
(2)設定動態分割槽
set hive.exec.dynamic.partition.mode = nonstrict;
hive (default)> insert into table dept_partition partition(location) select deptno, dname, loc from dept;
(3)檢視目標分割槽表的分割槽情況
hive (default)> show partitions dept_partition;
思考:目標分割槽表是如何匹配到分割槽欄位的? 3.9 分桶 詳見 6.6 章。 3.10 分割槽 詳見 4.6 章