Hive SQL 綜合應用案例實戰及多項效能指標深入講解-DW商業環境實戰
版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。QQ郵箱地址:[email protected],如有任何技術交流,可隨時聯絡。
1:order by, sort by, distribute by, cluster by
-
1.1 order by
hive 中的 order by 語句會對查詢結果做一次全域性排序,即,所有的 mapper 產生的結果都會交給一個 reducer 去處理,無論資料量大小, job 任務只會啟動一個 reducer,如果資料量巨大,則會耗費大量的時間。 提示: 如果在嚴格模式下, order by 需要指定 limit 資料條數,不然資料量巨大的情況下會造成崩潰無輸出結果。涉及屬性: set hive.mapred.mode=nonstrict/strict
select * from company_info order by money desc; 複製程式碼
-
1.2 sort by
hive 中的 sort by 語句會對每一塊區域性資料進行區域性排序,即,每一個 reducer 處理的資料都是有序的,但是不能保證全域性有序。
-
1.3 distribute by
hive 中的 distribute by 一般要和 sort by 一起使用,即將某一塊資料歸給(distribute by)某一個reducer 處理,然後在指定的 reducer 中進行 sort by 排序。
提示: distribute by 必須寫在 sort by 之前
提示: 涉及屬性
mapreduce.job.reduces, hive.exec.reducers.bytes.per.reducer 例如:不同的人(personId)分為不同的組,每組按照 money 排序
select * from company_info distribute by personId sort by personId, money desc; 複製程式碼
-
1.4 cluster by
hive 中的 cluster by 在 distribute by 和 sort by 排序欄位一致的情況下是等價的。 同時, cluster by 指定的列只能是降序,即預設的 descend,而不能是 ascend。 例如: 寫一個等價於 distribute by 與 sort by 的例子
select * from company_info distribute by personId sort by personId; select * from compnay_info cluster by personId; 複製程式碼
2: 行轉列、列轉行(UDAF 與 UDTF)
2.1 行轉列(concat_ws)
create table person_info(
name string,
constellation string,
blood_type string)
row format delimited fields terminated by "\t";
load data local inpath “person_info.tsv” into table person_info;
#collect_set(t1.name) 表示把分組後的多行值轉化為集合
select
t1.base,
concat_ws('|', collect_set(t1.name)) name
from
(select
name,
concat(constellation, ",", blood_type) base
from
person_info) t1
group by
t1.base;
複製程式碼
2.2 列轉行(array< string >陣列結構)
create table movie_info(
movie string,
category array<string>)
row format delimited fields terminated by "\t"
collection items terminated by ",";
load data local inpath "movie_info.tsv" into table movie_info;
複製程式碼
-
將電影分類中的陣列資料展開
select movie, category_name from movie_info lateral view explode(category) table_tmp as category_name; 複製程式碼
-
“fields terminated by”:欄位與欄位之間的分隔符。
-
“collection items terminated by”:一個欄位中各個子元素 item的分隔符
-
orc 即 Optimized Row Columnar (ORC) file,在 RCFile 的基礎上演化而來,可以提供一種高 效的方法在 Hive 中儲存資料, 提升了讀、寫、 處理資料的效率。
2.3 分桶
- 直接分桶
開始操作之前,需要將 hive.enforce.bucketing 屬性設定為 true,以標識 Hive 可以識別桶。
create table music(
id int,
name string,
size float)
row format delimited
fields terminated by "\t"
clustered by (id) into 4 buckets;
複製程式碼
- 在分割槽中分桶
當資料量過大,需要龐大分割槽數量時,可以考慮桶,因為分割槽數量太大的情況可能會導致文 件系統掛掉,而且桶比分割槽有更高的查詢效率。 資料最終落在哪一個桶裡,取決於 clustered by 的那個列的值的 hash 數與桶的個數求餘來決定。 雖然有一定離散性, 但不能保證每個桶 中的資料量是一樣的。
create table music2(
id int,
name string,
size float)
partitioned by (date string)
clustered by (id) sorted by(size) into 4 bucket
row format delimited
fields terminated by "\t";
load data local inpath 'demo/music.txt' into table music2 partition(date='2017-08-30');
複製程式碼
3 Hive綜合專案:
- 版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。QQ郵箱地址:[email protected],如有任何技術交流,可隨時聯絡。
3.1 專案欄位說明
3.2 使用者表
3.3 資料集
3.4 使用者資料集
3.5 資料初步ETL
package com.z.youtube.util;
public class ETLUtils {
/**
* 1、過濾不合法資料
* 2、去掉&符號左右兩邊的空格
* 3、 \t 換成&符號
* @param ori
* @return
*/
public static String getETLString(String ori){
String[] splits = ori.split("\t");
//1、過濾不合法資料
if(splits.length < 9) return null;
//2、去掉&符號左右兩邊的空格
splits[3] = splits[3].replaceAll(" ", "");
StringBuilder sb = new StringBuilder();
//3、 \t 換成&符號
for(int i = 0; i < splits.length; i++){
sb.append(splits[i]);
if(i < 9){
if(i != splits.length - 1){
sb.append("\t");
}
}else{
if(i != splits.length - 1){
sb.append("&");
}
}
}
return sb.toString();
}
}
複製程式碼
3.6 資料模型建模
-
建立原始表: youtube_ori, youtube_user_ori
#youtube_ori create table youtube_ori( videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int, relatedId array<string>) row format delimited fields terminated by "\t" collection items terminated by "&" stored as textfile; #youtube_user_ori: create table youtube_user_ori( uploader string, videos int, friends int) clustered by (uploader) into 24 buckets row format delimited fields terminated by "\t" stored as textfile; 複製程式碼
-
建立ORC表: youtube_orc, youtube_user_orc
#youtube_orc create table youtube_orc( videoId string, uploader string, age int, category array<string>, length int, views int, rate float, ratings int, comments int, relatedId array<string>) clustered by (uploader) into 8 buckets row format delimited fields terminated by "\t" collection items terminated by "&" stored as orc; #youtube_user_orc: create table youtube_user_orc( uploader string, videos int, friends int) clustered by (uploader) into 24 buckets row format delimited fields terminated by "\t" stored as orc; load data inpath "/youtube/output/video/2008/0222" into table youtube_ori; load data inpath "/youtube/user/2008/0903" into table youtube_user_ori; insert into table youtube_orc select * from youtube_ori; insert into table youtube_user_orc select * from youtube_user_ori; 複製程式碼
4 業務分析
4.1 統計視訊觀次數 Top10
使用 order by 按照 views 欄位做一個全域性排序即可,同時我們設定只顯示前 10 條。
select
videoId,
uploader,
age,
category,
length,
views,
rate,
ratings,
comments
from
youtube_orc
order by
views
desc limit
複製程式碼
4.2 統計視訊類別熱度 Top10
-
- 即統計每個類別有多少個視訊,顯示出包含視訊最多的前 10 個類別。
-
- 我們需要按照類別 group by 聚合,然後 count 組內的 videoId 個數即可。
-
- 因為當前表結構為:一個視訊對應一個或多個類別。所以如果要 group by 類別,需要先 將類別進行列轉行(展開),然後再進行 count 即可。
-
-
最後按照熱度排序,顯示前 10 條。
select category_name as category, count(t1.videoId) as hot from ( select videoId, category_name from youtube_orc lateral view explode(category) t_catetory as category_name) t1 group by t1.category_name order by hot desc limit 10; 複製程式碼
-
4.3 統計出視訊(觀看數最高的 20 個視訊)的所屬類別以及類別包含(這 Top20 視訊的個數)
-
- 先找到觀看數最高的 20 個視訊所屬條目的所有資訊,降序排列
-
- 把這 20 條資訊中的 category 分裂出來(列轉行)
-
-
最後查詢視訊分類名稱和該分類下有多少個 Top20 的視訊
select category_name as category, count(t2.videoId) as hot_with_views from ( select videoId, category_name from ( select * from youtube_orc order by views desc limit 20) t1 lateral view explode(category) t_catetory as category_name) t2 group by category_name order by hot_with_views desc; 複製程式碼
-
4.4 統計視訊觀看數 Top50 所關聯視訊的所屬類別的熱度排名
-
-
查詢出觀看數最多的前 50 個視訊的所有資訊(當然包含了每個視訊對應的關聯視訊),記 為臨時表 t1
t1:觀看數前 50 的視訊 select * from youtube_orc order by views desc limit 50; 複製程式碼
-
-
-
將找到的 50 條視訊資訊的相關視訊 relatedId 列轉行,記為臨時表 t2
select explode(relatedId) as videoId from t1; 複製程式碼
-
-
-
將相關視訊的 id 和 youtube_orc 表進行 inner join 操作
t5:得到兩列資料,一列是 category,一列是之前查詢出來的相關視訊 id (select distinct(t2.videoId), t3.category from t2 inner join youtube_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name; 複製程式碼
-
-
-
按照視訊類別進行分組,統計每組視訊個數,然後排行
select category_name as category, count(t5.videoId) as hot from ( select videoId, category_name from ( select distinct(t2.videoId), t3.category from ( select explode(relatedId) as videoId from ( select * from youtube_orc order by views desc limit 50) t1) t2 inner join youtube_orc t3 on t2.videoId = t3.videoId) t4 lateral view explode(category) t_catetory as category_name) t5 group by category_name order by hot desc; 複製程式碼
-
4.5 統計每個類別中的視訊熱度 Top10,以 Music 為例
-
-
要想統計 Music類別中的視訊熱度 Top10,需要先找到 Music類別,那麼就需要將 category 展開,所以可以建立一張表用於存放 categoryId 展開的資料。
create table youtube_category( videoId string, uploader string, age int, categoryId string, length int, views int, rate float, ratings int, comments int, relatedId array<string>) row format delimited fields terminated by "\t" collection items terminated by "&" stored as orc; 複製程式碼
-
-
-
向 category 展開的表中插入資料。
insert into table youtube_category select videoId, uploader, age, categoryId, length, views, rate, ratings, comments, relatedId from youtube_orc lateral view explode(category) catetory as categoryId; 複製程式碼
-
-
-
統計對應類別(Music)中的視訊熱度。
select videoId, views from youtube_category where categoryId = "Music" order by views desc limit 10; 複製程式碼
-
4.6 統計每個類別中視訊流量Top10,以 Music 為例
-
- 建立視訊類別展開表(categoryId 列轉行後的表)
-
-
按照 ratings 排序即可
select videoId, views, ratings from youtube_category where categoryId = "Music" order by ratings desc limit 10;
-
4.7 統計上傳視訊最多的使用者 Top10 以及他們上傳的觀看次數在前 20 的視訊
-
-
先找到上傳視訊最多的 10 個使用者的使用者資訊
select * from youtube_user_orc order by videos desc limit 10; 複製程式碼
-
-
-
通過 uploader 欄位與 youtube_orc 表進行 join,得到的資訊按照 views 觀看次數進行排序 即可。
select t2.videoId, t2.views, t2.ratings, t1.videos, t1.friends from ( select * from youtube_user_orc order by videos desc limit 10) t1 join youtube_orc t2 on t1.uploader = t2.uploader order by views desc limit 20; 複製程式碼
-
4.8 統計每個類別視訊觀看數 Top10
-
- 先得到 categoryId 展開的表資料
-
- 子查詢按照 categoryId 進行分割槽,然後分割槽內排序,並生成遞增數字,該遞增數字這一 列起名為 rank 列
-
-
通過子查詢產生的臨時表,查詢 rank 值小於等於 10 的資料行即可。
select t1.* from ( select videoId, categoryId, views, row_number() over(partition by categoryId order by views desc) rank from youtube_category) t1 where rank <= 10;
-
5 JVM 堆記憶體溢位
描述: java.lang.OutOfMemoryError: Java heap space
Error: Java heap space堆疊空間太小了,在mapred-site.xml中設定
<name>mapred.child.java.opts</name>
<value>-Xmx200m</value>
如果是新版本在這裡在修改中 hadoop-env.sh
export HADOOP_HEAPSIZE=2000
複製程式碼
解決: 在 yarn-site.xml 中加入如下程式碼
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>2.1</value>
</property>
<property>
<name>mapred.child.java.opts</name>
<value>-Xmx1024m</value>
</property>
複製程式碼
6 Hive效能優化詳解
- 列裁剪
Hive在讀資料的時候,可以只讀取查詢中所需要用到的列,而忽略其他的列。這樣做可以節省讀取開銷,中間表儲存開銷和資料整合開銷。
set hive.optimize.cp=true; // 預設為true
Hive 在讀資料的時候,可以只讀取查詢中所需要用到的列,而忽略其它列。 例如,若有以下查詢:
SELECT a,b FROM q WHERE e<10;
在實施此項查詢中,Q 表有 5 列(a,b,c,d,e),Hive 只讀取查詢邏輯中真實需要 的 3 列 a、b、e,
而忽略列 c,d;這樣做節省了讀取開銷,中間表儲存開銷和資料整合開銷。
裁剪所對應的引數項為:hive.optimize.cp=true(預設值為真)
複製程式碼
-
分割槽裁剪
在查詢的過程中只選擇需要的分割槽,可以減少讀入的分割槽數目,減少讀入的資料量。 set hive.optimize.pruner=true; // 預設為true 複製程式碼
-
join優化:在進行join的時候,大表放在最後面,但是使用 /+streamtable(大表名稱)/ 來標記大表,那麼大表放在什麼位置都行了
select /*+streamtable(s)*/ s.ymd,d.dividend from stocks s inner join dividends d on s.ymd=d.ymd and s.symbol=d.symbol where s.symbol=’aapl’ 複製程式碼
-
在hive中,當對3個或更多張表進行join時,如果on條件使用相同欄位,那麼它們會合併為一個MapReduce Job,利用這種特性,可以將相同的join on的放入一個job來節省執行時間。
-
優先過濾資料,儘量減少每個階段的資料量,對於分割槽表能用上分割槽欄位的儘量使用,同時只選擇後面需要使用到的列,最大限度的減少參與join的資料量。
-
啟用mapjoin,mapjoin是將join雙方比較小的表直接分發到各個map程序的記憶體中,在map程序中進行join操作,這樣就可以不用進行reduce步驟,從而提高了速度。只有join操作才能啟用mapjoin。
set hive.auto.convert.join = true; // 是否根據輸入小表的大小,自動將reduce端的common join 轉化為map join,將小表刷入記憶體中。 set hive.mapjoin.smalltable.filesize = 2500000; // 刷入記憶體表的大小(位元組) set hive.mapjoin.maxsize=1000000; // Map Join所處理的最大的行數。超過此行數,Map Join程序會異常退出 複製程式碼
-
儘量原子操作,儘量避免一個SQL包含複雜的邏輯,可以使用中間表來完成複雜的邏輯。
-
並行執行, hive會將一個查詢任務轉化為一個或多個階段。預設情況下,一次只執行一個階段。如果某些階段不是互相依賴的,是可以並行執行的,這樣可以縮短整個job執行時間。
set hive.exec.parallel=true; // 可以開啟併發執行。 set hive.exec.parallel.thread.number=16; // 同一個sql允許最大並行度,預設為8。 複製程式碼
-
中間資料壓縮,中間資料壓縮就是對hive查詢的多個job之間的資料進行壓縮。最好是選擇一個節省CPU耗時的壓縮方式。可以採用snappy壓縮演算法,該演算法的壓縮和解壓效率都非常高。開啟中間壓縮(map輸出結果(臨時的)壓縮) 。
set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec; set hive.intermediate.compression.type=BLOCK; 複製程式碼
-
結果資料壓縮
最終的結果資料(Reducer輸出資料)也是可以進行壓縮的,可以選擇一個壓縮效果比較好的,可以減少資料的大小和資料的磁碟讀寫時間;
注:常用的gzip,snappy壓縮演算法是不支援並行處理的,如果資料來源是gzip/snappy壓縮檔案大檔案,這樣只會有有個mapper來處理這個檔案,會嚴重影響查詢效率。所以如果結果資料需要作為其他查詢任務的資料來源,可以選擇支援splitable的LZO演算法,這樣既能對結果檔案進行壓縮,還可以並行的處理,這樣就可以大大的提高job執行的速度了。關於如何給Hadoop叢集安裝LZO壓縮庫可以檢視這篇文章。
set hive.exec.compress.output=true; set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; set mapred.output.compression.type=BLOCK: 如何給hadoop安裝壓縮庫 https://mp.weixin.qq.com/s?__biz=MzU5OTM5NjQ2NA==&mid=2247483676&idx=1&sn=2a14972e97bc6c25647e962c12ce3e77&chksm=feb4d803c9c35115c66017e077fdc4b613515d3b93206d62400213bd5ab79ae46f125333db15&scene=21#wechat_redirect Hadoop叢集支援一下演算法: org.apache.hadoop.io.compress.GzipCodec org.apache.hadoop.io.compress.SnappyCodec com.hadoop.compression.lzo.LzopCodec org.apache.hadoop.io.compress.Lz4Codec 複製程式碼
-
本地化執行
對於小資料集,可以通過本地模式,在單臺機器上處理所有任務,執行時間明顯被縮短 set mapred.job.tracker=local; set hive.exec.mode.local.auto=true; 當一個job滿足下面條件才能真正使用本地模式: job的輸入資料大小必須小於引數hive.exec.mode.local.inputbytes.max(預設128M) job的map數必須小於引數hive.exec.mode.local.auto.tasks.max(預設4) job的reduce數必須為0或者1 複製程式碼
-
Map端聚合優化
hive.map.aggr=true; // 用於設定是否在 map 端進行聚合,預設值為真 hive.groupby.mapaggr.checkinterval=100000; // 用於設定 map 端進行聚合操作的條目數 複製程式碼
-
合併小檔案
在執行MapReduce程式的時候,一般情況是一個檔案需要一個mapper來處理。但是如果資料來源是大量的小檔案,這樣豈不是會啟動大量的mapper任務,這樣會浪費大量資源。可以將輸入的小檔案進行合併,從而減少mapper任務數量.
Hadoop小檔案問題解決方案 https://mp.weixin.qq.com/s?__biz=MzU5OTM5NjQ2NA==&mid=2247483659&idx=1&sn=28a7d3e2c0bd87fa4239719b1b360aed&chksm=feb4d814c9c35102ad9f018342307e3fe86f06cfeb14d522bf563b0c61bba061907065490eba&scene=21#wechat_redirect set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; // Map端輸入、合併檔案之後按照block的大小分割(預設) set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; // Map端輸入,不合並 一個檔案起一個Map 複製程式碼
-
控制map任務數量
1)減少mapper數可以通過合併小檔案來實現,增加mapper數可以通過控制上一個reduce。
輸入檔案總大小:total_size hdfs設定的資料量大小:dfs_block_size default_mapper_num=total_size/dfs_block_size set mapred.map.tasks=10; 複製程式碼
2)那如果我們需要減少mapper數量,但是檔案大小是固定的,那該怎麼辦呢? 可以通過mapred.min.split.size設定每個任務處理的檔案的大小,這個大小隻有在大於dfs_block_size的時候才會生效。
split_size=max(mapred.min.split.size, dfs_block_size) split_num=total_size/split_size compute_map_num = min(split_num, max(default_mapper_num, mapred.map.tasks)) 複製程式碼
3)總結一下控制mapper個數的方法:
(1)如果想增加mapper個數,可以設定mapred.map.tasks為一個較大的值 (2)如果想減少mapper個數,可以設定maperd.min.split.size為一個較大的值 (3)如果輸入是大量小檔案,想減少mapper個數,可以通過設定 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;合併小檔案。 複製程式碼
-
控制reducer數量
1)如果reducer數量過多,一個reducer會產生一個結果檔案,這樣就會生成很多小檔案,那麼如果這些結果檔案會作為下一個job的輸入,則會出現小檔案需要進行合併的問題,而且啟動和初始化reducer需要耗費和資源。
2)如果reducer數量過少,這樣一個reducer就需要處理大量的資料,並且還有可能會出現資料傾斜的問題,使得整個查詢耗時長。
預設情況下,hive分配的reducer個數由下列引數決定: 引數1:hive.exec.reducers.bytes.per.reducer(預設1G) 引數2:hive.exec.reducers.max(預設為999) reducer的計算公式為: N=min(引數2, 總輸入資料量/引數1) 可以通過改變上述兩個引數的值來控制reducer的數量,也可以通過 set mapred.map.tasks=10; 直接控制reducer個數,如果設定了該引數,上面兩個引數就會忽略。 複製程式碼
-
group by資料傾斜優化
在實際業務中,通常是資料集中在某些點上,這樣在進行資料分組的時候,某一些組上資料量非常大,而其他的分組上資料量很小,在MapReduce程式中,同一個分組的資料會分配到同一個reduce上進行操作,這樣會導致某一些reduce壓力很大,一些reduce壓力很小,這就是資料傾斜,整個job執行時間取決於那個執行最慢的那個reduce。
set hive.groupby.skewindata=false; //決定 group by 操作是否支援傾斜的資料。注意:只能對單個欄位聚合 當上面選項設定為true的時候,生成的查詢任務會生成兩個MapReduce Job。 第一個Job,map的輸出結果會隨機分佈到Reduce中,每個Reduce做部分聚合操作,並輸出結果,這樣處理的結果是相同的 group by key有可能被分發到不同的reduce中,從而達到負載均衡的目的; 第二個Job再根據預處理的資料結果按照 group by key分佈到reduce中,這個過程可以保證相同的 group by key被分佈到同一個reduce中,最後完成最終的聚合操作。 複製程式碼
-
JVM重用
1)hadoop預設配置是使用派生JVM來執行Map和Reduce任務的,JVM的啟動過程會造成相當大的開銷。尤其是執行的job包含成千上萬個task任務的情況。
2)JVM重用可以使得JVM例項在同一個job中重新使用N次,N的值可以在hadoop的配置檔案mapred-site.xml檔案中進行配置。
set mapred.job.reuse.jvm.num.tasks=20; 複製程式碼
3)JVM重用也是有缺點的,開啟JVM重用會一直佔用使用到的task的插槽,以便進行重用,知道任務完成後才會釋放。 如果某個不平衡的job中有幾個reduce task執行的時間要比其他的reduce task消耗的時間要多得多的話,那麼保留的插槽就會一直空閒卻無法被其他的job使用,直到所有的task都結束了才會釋放。
-
列式儲存
建立表的時候,可以設定成orc/parquet列式儲存格式。因為列式儲存的表,每一列的資料在物理上是儲存在一起的,Hive在查詢的時候只會遍歷需要的列資料,從而可以大大減少處理的資料量。
7 總結
hive優化請優先過濾資料,啟用mapjoin,Map端聚合優化,group by資料傾斜優化,JVM重用相對更有意義。
秦凱新 於深圳
版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。QQ郵箱地址:[email protected],如有任何技術交流,可隨時聯絡。