1. 程式人生 > 其它 >Hive系列(五)深入理解

Hive系列(五)深入理解

深入理解Hive

分割槽與分桶

Hive分割槽

在Hive Select查詢中一般會掃描整個表內容,會消耗很多時間做沒必要的工作。有時候只需要掃描表中關心的一部分資料,因此建表時引入了partition概念。分割槽表指的是在建立表時指定的partition的分割槽空間。分割槽是指按照資料表的某列或某些列分為多個區,區從形式上可以理解為資料夾,比如我們要收集某個大型網站的日誌資料,一個網站每天的日誌資料存在同一張表上,由於每天會生成大量的日誌,導致資料表的內容巨大,在查詢時進行全表掃描耗費的資源非常多。那其實這個情況下,我們可以按照日期對資料表進行分割槽,不同日期的資料存放在不同的分割槽,在查詢時只要指定分割槽欄位的值就可以直接從該分割槽查詢。

1、建立分割槽表插入資料
單分割槽表

create table if not exists user_info_partition_single(
user_id string comment '使用者id',
user_name string comment '使用者姓名',
user_gender string comment '使用者性別',
user_age int comment '使用者年齡'
)
comment '使用者資訊表'
partitioned by(province string)
row format delimited fields terminated by ','
lines terminated by '\n' ;

插入資料

insert overwrite table user_info_partition_single partition(province='BeiJing')
select '11' as user_id,
'YangZi' as user_name,
'F' as user_gender,
28 as user_age
;

多分割槽表

create table if not exists user_info_partition_multiple(
user_id string comment '使用者id',
user_name string comment
'使用者姓名', user_gender string comment '使用者性別', user_age int comment '使用者年齡' ) comment '使用者資訊表' partitioned by(province string,city string) row format delimited fields terminated by ',' lines terminated by '\n' ;

插入資料

insert overwrite table user_info_partition_multiple partition(province='JangSu',city='SuZhou')
select '11' as user_id,
'YangZi' as user_name,
'F' as user_gender,
28 as user_age
;

2、檢視分割槽目錄表

[[email protected] hive-2.3.6]# hdfs dfs -ls /hive/warehouse/test.db/user_info_partition_single
Found 1 items
drwxrwxrwx   - root supergroup          0 2021-01-06 19:12 /hive/warehouse/test.db/user_info_partition_single/province=BeiJing
[[email protected] hive-2.3.6]# hdfs dfs -ls /hive/warehouse/test.db/user_info_partition_multiple
Found 1 items
drwxrwxrwx   - root supergroup          0 2021-01-13 11:37 /hive/warehouse/test.db/user_info_partition_multiple/province=JangSu
[[email protected] hive-2.3.6]# hdfs dfs -ls /hive/warehouse/test.db/user_info_partition_multiple/province=JangSu
Found 1 items
drwxrwxrwx   - root supergroup          0 2021-01-13 11:37 /hive/warehouse/test.db/user_info_partition_multiple/province=JangSu/city=SuZhou

通過檢視分割槽表在檔案系統中位置,我們會發現分割槽所依據的列反應在檔案路徑上,單分割槽表下面有個溫檔案目錄province=BeiJing,多分割槽表下面有個province=JiangSu目錄,province=JiangSu目錄下面有個city=SuZhou的目錄。

重點強調,所謂分割槽,這是將滿足某些條件的記錄打包,做個記號,在查詢時提高效率,相當於按資料夾對檔案進行分類,資料夾名可類比分割槽欄位。這個分割槽欄位形式上存在於資料表中,在查詢時會顯示到客戶端上,但並不真正在儲存在資料表文件中,是所謂偽列。所以,千萬不要以為是對屬性表中真正存在的列按照屬性值的異同進行分割槽。比如上面的分割槽依據的列province並不真正的存在於資料表中,是我們為了方便管理新增的一個偽列,這個列的值也是我們人為規定的,不是從資料表中讀取之後根據值的不同將其分割槽。我們並不能按照某個資料表中真實存在的列來分割槽。

Hive分桶

分桶是相對分割槽進行更細粒度的劃分。分桶進行區分,Hive採用對列值雜湊,然後將某列屬性值的hash值除以桶的個數求餘的方式決定該條記錄存放在哪個桶當中。例如按照province屬性分為3個桶,就是對province屬性值的hash值對3取摸,按照取模結果對資料分桶。如取模結果為0的資料記錄存放到一個檔案,取模為1的資料存放到一個檔案,取模為2的資料存放到一個檔案。

1、分桶的場景
分割槽提供了一個隔離資料和優化查詢的便利方式,不過並非所有的資料都可形成合理的分割槽,尤其是需要確定合適大小的分區劃分方式,(不合理的資料分區劃分方式可能導致有的分割槽資料過多,而某些分割槽沒有什麼資料的尷尬情況)
分桶是將資料集分解為更容易管理的若干部分的另一種技術。

2、把表(或者分割槽)組織成桶(Bucket)有兩個理由:
(1)獲得更高的查詢處理效率。桶為表加上了額外的結構,Hive 在處理有些查詢時能利用這個結構。具體而言,連線兩個在(包含連線列的)相同列上劃分了桶的表,可以使用 Map 端連線 (Map-side join)高效的實現。比如JOIN操作。對於JOIN操作兩個表有一個相同的列,如果對這兩個表都進行了桶操作。那麼將儲存相同列值的桶進行JOIN操作就可以,可以大大較少JOIN的資料量。

(2)使取樣(sampling)更高效。在處理大規模資料集時,在開發和修改查詢的階段,如果能在資料集的一小部分資料上試執行查詢,會帶來很多方便。

3、、如何分桶
1)分桶之前要執行命令hive.enforce.bucketiong=true;
2)使用關鍵字clustered by 指定分割槽依據的列名,指定分為多少桶
3)與分割槽不同的是,分割槽依據的不是真實資料表文件中的列,而是我們指定的偽列,但是分桶是依據資料表中真實的列而不是偽列。所以在指定分割槽依據的列的時候要指定列的型別,因為在資料表文件中不存在這個列,相當於新建一個列。而分桶依據的是表中已經存在的列,這個列的資料型別顯然是已知的,所以不需要指定列的型別。

建立表格

create table if not exists user_info_bucket (
user_id string comment '使用者id',
user_name string comment '使用者姓名',
user_gender string comment '使用者性別',
user_age int comment '使用者年齡'
)
clustered by (user_id) into 3 buckets
row format delimited fields terminated by ',' lines terminated by '\n'
;

插入資料

load data local inpath '/home/huangwei/input/user_info.txt' into table user_info_bucket;
Error: Error while compiling statement: FAILED: SemanticException Please load into an intermediate table and use 'insert... select' to allow Hive to enforce bucketing. Load into bucketed tables are disabled for safety reasons. If you know what you are doing, please sethive.strict.checks.bucketing to false and that hive.mapred.mode is not set to 'strict' to proceed. Note that if you may get errors or incorrect results if you make a mistake while using some of the unsafe features. (state=42000,code=40000)

分桶表不能採用load資料方式插入資料,需要將資料先匯入一個intermediate table中,採用"insert…select"方式插入資料

insert overwrite table user_info_bucket select user_id,user_name,user_gender,user_age from user_info_external;

4、檢視分桶資訊

hdfs dfs -ls /hive/warehouse/test.db/user_info_bucket
Found 3 items
-rwxrwxrwx   1 root supergroup         44 2021-01-13 14:34 /hive/warehouse/test.db/user_info_bucket/000000_0
-rwxrwxrwx   1 root supergroup         61 2021-01-13 14:34 /hive/warehouse/test.db/user_info_bucket/000001_0
-rwxrwxrwx   1 root supergroup         47 2021-01-13 14:34 /hive/warehouse/test.db/user_info_bucket/000002_0

可以看到分3個桶就是將資料表由一個檔案儲存分為3個檔案儲存

5、對桶資料進取樣
桶的個數從1開始計數。因此,前面的查詢從4個桶的第一個中獲取所有的使用者。 對於一個大規模的、均勻分佈的資料集,這會返回表中約四分之一的資料行。我們 也可以用其他比例對若干個桶進行取樣(因為取樣並不是一個精確的操作,因此這個 比例不一定要是桶數的整數倍)。

select * from user_info_bucket tablesample(bucket 1 out of 3 on user_id);
+---------------------------+-----------------------------+-------------------------------+----------------------------+
| user_info_bucket.user_id  | user_info_bucket.user_name  | user_info_bucket.user_gender  | user_info_bucket.user_age  |
+---------------------------+-----------------------------+-------------------------------+----------------------------+
| 9                         | LuHan                       | M                             | 30                         |
| 6                         | JinChen                     | F                             | 30                         |
| 3                         | ZhengKai                    | M                             | 34                         |
+---------------------------+-----------------------------+-------------------------------+----------------------------+

tablesample是抽樣語句,語法:TABLESAMPLE(BUCKET x OUTOF y)
y必須是table總bucket數的倍數或者因子。hive根據y的大小,決定抽樣的比例。例如,table總共分了64份,當y=32時,抽取(64/32=)2個bucket的資料,當y=128時,抽取(64/128=)1/2個bucket的資料。x表示從哪個bucket開始抽取。例如,table總bucket數為32,tablesample(bucket 3 out of 16),表示總共抽取(32/16=)2個bucket的資料,分別為第3個bucket和第(3+16=)19個bucket的資料。

分割槽又分桶

可以在分割槽的基礎上再分桶

create table if not exists user_info_partition_bucket (
user_id string comment '使用者id',
user_name string comment '使用者姓名',
user_gender string comment '使用者性別',
user_age int comment '使用者年齡'
)
partitioned by(province string,city string)
clustered by (user_id) into 3 buckets
row format delimited fields terminated by ',' lines terminated by '\n'
;

insert overwrite table user_info_partition_bucket partition(province='JangSu',city='SuZhou') select user_id,user_name,user_gender,user_age from person;

檢視分割槽分桶資訊

 hdfs dfs -ls /hive/warehouse/test.db/user_info_partition_bucket/province=JangSu/city=SuZhou/
Found 3 items
-rwxrwxrwx   1 root supergroup         40 2021-01-13 15:32 /hive/warehouse/test.db/user_info_partition_bucket/province=JangSu/city=SuZhou/000000_0
-rwxrwxrwx   1 root supergroup         31 2021-01-13 15:32 /hive/warehouse/test.db/user_info_partition_bucket/province=JangSu/city=SuZhou/000001_0
-rwxrwxrwx   1 root supergroup         25 2021-01-13 15:32 /hive/warehouse/test.db/user_info_partition_bucket/province=JangSu/city=SuZhou/000002_0

檢視分桶資料

select * from user_info_partition_bucket tablesample(bucket 1 out of 3 on user_id);
+-------------------------------------+---------------------------------------+-----------------------------------------+--------------------------------------+--------------------------------------+----------------------------------+
| user_info_partition_bucket.user_id  | user_info_partition_bucket.user_name  | user_info_partition_bucket.user_gender  | user_info_partition_bucket.user_age  | user_info_partition_bucket.province  | user_info_partition_bucket.city  |
+-------------------------------------+---------------------------------------+-----------------------------------------+--------------------------------------+--------------------------------------+----------------------------------+
|                                     | NULL                                  | NULL                                    | NULL                                 | JangSu                               | SuZhou                           |
| 6                                   | AiLi                                  | F                                       | 18                                   | JangSu                               | SuZhou                           |
| 3                                   | WangCuiHua                            | F                                       | 21                                   | JangSu                               | SuZhou                           |
+-------------------------------------+---------------------------------------+-----------------------------------------+--------------------------------------+--------------------------------------+----------------------------------+

Join底層MapReduce實現

Hive中的Join可分為兩種情況
Common Join (Reduce階段完成Join)
Map Join (Map階段完成Join)

Common Join

如果沒有開啟hive.auto.convert.join=true或者不符合MapJoin的條件,那麼Hive解析器會將Join操作轉換成Common Join,在Reduce階段完成join,並且整個過程包含Map、Shuffle、Reduce階段。
1、Map階段
讀取表的資料,Map輸出以Join on條件中的列為key,如果Join有多個關聯鍵,則以這些關聯鍵的組合為key。
Map輸出的value為Join之後需要輸出或者作為條件的列;同時在value中還會包含表的Tag資訊,用於標明此value對應的表,按照key進行排序。
2、Shuffle階段
根據key取雜湊值,並將key/value按照雜湊值分發到不同的reduce中
3、Reduce階段
根據key完成Join操作,並且通過Tag來識別不同表中的資料。在合併過程中,把表編號扔掉。
舉例

drop table if exists wedw_dwd.user_info_df;
 CREATE TABLE wedw_dwd.user_info_df(
  user_id    string  COMMENT '使用者id',
  user_name  string  COMMENT '使用者姓名'
 )
row format delimited fields terminated by '\t'
 STORED AS textfile
 ;
 
 +----------+-------------+
| user_id  | user_name    |
+----------+-------------+
| 1        | 小紅         |
| 2        | 小明         |
| 3        | 小花         |
+----------+-------------+

 drop table if exists wedw_dwd.order_info_df;
 CREATE TABLE wedw_dwd.order_info_df(
  user_id      string  COMMENT '使用者id',
  course_name  string  COMMENT '課程名稱'
 )
row format delimited fields terminated by '\t'
 STORED AS textfile
 ;
 
 +----------+--------------+
| user_id  | course_name  |
+----------+---------------+
| 1        | spark        |
| 2        | flink        |
| 3        | java         |
+----------+--------------+
select
 t1.user_id
,t1.user_name
,t2.course_name
from
wedw_dwd.user_info_df t1
join wedw_dwd.order_info_df t2
on t1.user_id = t2.user_id
;
+----------+------------+---------------+
| user_id  | user_name  | course_name  |
+----------+------------+---------------+
| 1        | 小紅         | spark        |
| 2        | 小明         | flink        |
| 3        | 小花         | java         |
+----------+------------+---------------+

圖解:(在合併過程中,把表編號扔掉)
在這裡插入圖片描述

Map Join

1、什麼是Map Join
MapJoin顧名思義,就是在Map階段進行表之間的連線。而不需要進入到Reduce階段才進行連線。這樣就節省了在Shuffle階段時要進行的大量資料傳輸。從而起到了優化作業的作用。

2、MapJoin的原理:
通常情況下,要連線的各個表裡面的資料會分佈在不同的Map中進行處理。即同一個Key對應的Value可能存在不同的Map中。這樣就必須等到Reduce中去連線。

要使MapJoin能夠順利進行,那就必須滿足這樣的條件:除了一份表的資料分佈在不同的Map中外,其他連線的表的資料必須在每個Map中有完整的拷貝。

3、MapJoin適用的場景:
通過上面分析你會發現,並不是所有的場景都適合用MapJoin. 它通常會用在如下的一些情景:在二個要連線的表中,有一個很大,有一個很小,這個小表可以存放在記憶體中而不影響效能。
這樣我們就把小表文件複製到每一個Map任務的本地,再讓Map把檔案讀到記憶體中待用。
不等值的連結操作

4、MapJoin的實現方法:

 1)在Map-Reduce的驅動程式中使用靜態方法DistributedCache.addCacheFile()增加要拷貝的小表文件,。JobTracker在作業啟動之前會獲取這個URI列表,並將相應的檔案拷貝到各個TaskTracker的本地磁碟上。

 2)在Map類的setup方法中使用DistributedCache.getLocalCacheFiles()方法獲取檔案目錄,並使用標準的檔案讀寫API讀取相應的檔案。

5、Hive內建提供的優化機制之一就包括MapJoin
在Hive v0.7之前,需要使用hint提示 /*+ mapjoin(table) */才會執行MapJoin 。Hive v0.7之後的版本已經不需要給出MapJoin的指示就進行優化。它是通過如下配置引數來控制的:

hive> set hive.auto.convert.join=true;

Hive還提供另外一個引數–表文件的大小作為開啟和關閉MapJoin的閾值。

hive.mapjoin.smalltable.filesize=25000000 即25M

示例:

select f.a,f.b from A t join B f  on ( f.a=t.a and f.ftime=20110802)  

該語句中B表有30億行記錄,A表只有100行記錄,而且B表中資料傾斜特別嚴重,有一個key上有15億行記錄,在執行過程中特別的慢,而且在reduece的過程中遇有記憶體不夠而報錯。
解決方法:MAPJION會把小表全部讀入記憶體中,在map階段直接拿另外一個表的資料和記憶體中表資料做匹配,由於在map是進行了join操作,省去了reduce執行的效率也會高很多

select /*+ mapjoin(A)*/ f.a,f.b from A t join B f  on ( f.a=t.a and f.ftime=20110802) 

mapjoin還有一個很大的好處是能夠進行不等連線的join操作,如果將不等條件寫在where中,那麼mapreduce過程中會進行笛卡爾積,執行效率特別低,

如果使用mapjoin操作,在map的過程中就完成了不等值的join操作,效率會高很多。

select A.a ,A.b from A join B where A.a>B.a

高階聚合函式

資料準備

廣東,廣州,白雲區,100,30
廣東,廣州,番禺區,120,45
廣東,深圳,福田區,200,67
廣東,深圳,南山區,290,10
浙江,杭州,蕭山區,80,20
浙江,杭州,濱江區,120,50
浙江,寧波,江東區,80,23
浙江,寧波,江北區,45,5

建立表載入資料

create table if not exists area_gdp_df
(
province_name string comment '省份名稱',
city_name string comment '城市名稱',
area_name string comment '地區名稱',
people_cnt int comment '人口數量',
amt decimal(16,2) comment 'GDP'
)
row format delimited fields terminated by ','
;

with cube

分組組合最全,包含所有可能的組合

select province_name,city_name,area_name,sum(people_cnt) all_people_cnt from area_gdp_df group by province_name,city_name,area_name with cube;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
+----------------+------------+------------+-----------------+
| province_name  | city_name  | area_name  | all_people_cnt  |
+----------------+------------+------------+-----------------+
| NULL           | NULL       | NULL       | 1035            |
| NULL           | NULL       | 南山區        | 290             |
| NULL           | NULL       | 江東區        | 80              |
| NULL           | NULL       | 江北區        | 45              |
| NULL           | NULL       | 濱江區        | 120             |
| NULL           | NULL       | 番禺區        | 120             |
| NULL           | NULL       | 白雲區        | 100             |
| NULL           | NULL       | 福田區        | 200             |
| NULL           | NULL       | 蕭山區        | 80              |
| NULL           | 寧波         | NULL       | 125             |
| NULL           | 寧波         | 江東區        | 80              |
| NULL           | 寧波         | 江北區        | 45              |
| NULL           | 廣州         | NULL       | 220             |
| NULL           | 廣州         | 番禺區        | 120             |
| NULL           | 廣州         | 白雲區        | 100             |
| NULL           | 杭州         | NULL       | 200             |
| NULL           | 杭州         | 濱江區        | 120             |
| NULL           | 杭州         | 蕭山區        | 80              |
| NULL           | 深圳         | NULL       | 490             |
| NULL           | 深圳         | 南山區        | 290             |
| NULL           | 深圳         | 福田區        | 200             |
| 廣東             | NULL       | NULL       | 710             |
| 廣東             | NULL       | 南山區        | 290             |
| 廣東             | NULL       | 番禺區        | 120             |
| 廣東             | NULL       | 白雲區        | 100             |
| 廣東             | NULL       | 福田區        | 200             |
| 廣東             | 廣州         | NULL       | 220             |
| 廣東             | 廣州         | 番禺區        | 120             |
| 廣東             | 廣州         | 白雲區        | 100             |
| 廣東             | 深圳         | NULL       | 490             |
| 廣東             | 深圳         | 南山區        | 290             |
| 廣東             | 深圳         | 福田區        | 200             |
| 浙江             | NULL       | NULL       | 325             |
| 浙江             | NULL       | 江東區        | 80              |
| 浙江             | NULL       | 江北區        | 45              |
| 浙江             | NULL       | 濱江區        | 120             |
| 浙江             | NULL       | 蕭山區        | 80              |
| 浙江             | 寧波         | NULL       | 125             |
| 浙江             | 寧波         | 江東區        | 80              |
| 浙江             | 寧波         | 江北區        | 45              |
| 浙江             | 杭州         | NULL       | 200             |
| 浙江             | 杭州         | 濱江區        | 120             |
| 浙江             | 杭州         | 蕭山區        | 80              |
+----------------+------------+------------+-----------------+

grouping sets

自定義維度,根據需要分組即可。

select province_name,city_name,sum(people_cnt) all_people_cnt from area_gdp_df group by province_name,city_name grouping sets((province_name,city_name));
+----------------+------------+-----------------+
| province_name  | city_name  | all_people_cnt  |
+----------------+------------+-----------------+
| 廣東             | 廣州         | 220             |
| 廣東             | 深圳         | 490             |
| 浙江             | 寧波         | 125             |
| 浙江             | 杭州         | 200             |
+----------------+------------+-----------------+

select province_name,city_name,sum(people_cnt) all_people_cnt from area_gdp_df group by province_name,city_name grouping sets((province_name,city_name),(province_name));
+----------------+------------+-----------------+
| province_name  | city_name  | all_people_cnt  |
+----------------+------------+-----------------+
| 廣東             | NULL       | 710             |
| 廣東             | 廣州         | 220             |
| 廣東             | 深圳         | 490             |
| 浙江             | NULL       | 325             |
| 浙江             | 寧波         | 125             |
| 浙江             | 杭州         | 200             |
+----------------+------------+-----------------+

rollup

rollup的各維度組合應滿足,前一維度為null後一位維度必須為null,前一維度取非null時,下一維度隨意,即層次維度

select province_name,city_name,area_name,sum(people_cnt) all_people_cnt from area_gdp_df group by province_name,city_name,area_name with rollup;
+----------------+------------+------------+-----------------+
| province_name  | city_name  | area_name  | all_people_cnt  |
+----------------+------------+------------+-----------------+
| NULL           | NULL       | NULL       | 1035            |
| 廣東             | NULL       | NULL       | 710             |
| 廣東             | 廣州         | NULL       | 220             |
| 廣東             | 廣州         | 番禺區        | 120             |
| 廣東             | 廣州         | 白雲區        | 100             |
| 廣東             | 深圳         | NULL       | 490             |
| 廣東             | 深圳         | 南山區        | 290             |
| 廣東             | 深圳         | 福田區        | 200             |
| 浙江             | NULL       | NULL       | 325             |
| 浙江             | 寧波         | NULL       | 125             |
| 浙江             | 寧波         | 江東區        | 80              |
| 浙江             | 寧波         | 江北區        | 45              |
| 浙江             | 杭州         | NULL       | 200             |
| 浙江             | 杭州         | 濱江區        | 120             |
| 浙江             | 杭州         | 蕭山區        | 80              |
+----------------+------------+------------+-----------------+