Hive:select count(distinct)優化以及hive.groupby.skewindata
Hive:select count(distinct)優化以及hive.groupby.skewindata
原文連結:https://juejin.cn/post/6926536667877048333
問題引入
資料分析師小A接到需求,需要統計當日各個省份20歲以下的日活躍使用者數(去重統計user_id,即UV)
現有一個Hive表儲存著使用者行為資料:user_behaviour_trace_info
列 | 描述 |
---|---|
user_id | 使用者id |
nickname | 暱稱 |
age | 年齡 |
province | 省份 |
url | 訪問地址 |
access_time | 訪問時間 |
device_id | 使用者手機裝置id |
小A很順其自然的寫出這段SQL:
select
province,
count(distinct user_id) as uv
from
user_behaviour_trace_info
where
access_time = '今天' and age < 20
group by
province
複製程式碼
立馬提交SQL開始執行任務,一頓操作猛如虎,一看時長十點五(小時)
心想不愧是使用者行為資料,資料量居然這麼大?那讓我們看看任務各個Task的執行耗時:
以下三個JobHistory截圖屬於另一個select count(distinct)資料傾斜任務,具有代表意義
我們可以觀察到
- 任務整體耗時:10小時11分鐘:
- Map Task 平均耗時:1分16秒
- Reduce Task 平均耗時:1分59秒
任務執行時間長,MR Task 平均耗時短,極有可能是出現了資料傾斜!
那我們繼續看看Map Task的執行情況,按Map Task耗時倒序排序
Map Task最長耗時為2分49秒,而且整體看起來執行耗時相差不大,問題不在Map階段
接下來看看Reduce階段,按Reduce Task耗時倒序排序
好傢伙,有一個Reduce Task執行了10個小時,另個一執行了近2小時,其餘Reduce Task的執行時間很短。
說好了大家一起幹活,最終卻只有我一個人扛下了所有?
那麼,問題出在哪裡?
我們先要弄明白 Hive 是如何執行這段 SQL 的
Hive SQL 最終要轉化成 MapReducer 任務,在邏輯上可以細分為三個階段:
- Map階段:將 group by 欄位作為 key,聚合函式中的列作為 Value,輸出鍵值對
- Shuffle階段:對 Map 階段輸出的鍵值對 Key 進行 Hash,按照Hash值將鍵值對傳送至各個 Reducer 中(相同的 Key 會分配給同一個 Reducer)
- Reduce階段:執行聚合操作
簡而言之:SQL 中的 Group By 欄位會決定某條資料最終落在哪一個 Reducer 上處理。
下文將 group by 的欄位稱之為 group_by_column
那麼,對於剛剛那段SQL,group_by_column 是 province,同一個 province 的資料會分配給同一個 Reducer,在 Reduce 階段,對 user_id 進行去重統計
然鵝,我國共有34個省級行政區域,一個 Reducer 處理一個省的資料,最多也只能有 34 個 Reducer 同時處理資料
當然,多個省份還可能落在同一個 Reducer 中
如何優雅的解決問題?
我們先來分析一下最初那段SQL的本質:
- Map + Shuffle階段:按 province 將資料分發給 Reducer
- Reduce階段:對同一個 province 的 user_id 先去重,再計數
Reduce 階段任務最重,執行了去重和計數兩個操作:
- 去重:在 province 內,對 user_id 去重
- 計數:統計 province 內 user_id 的個數
SQL任務慢的原因是:同一個 province 的資料全部由同一個 Reducer 處理
思考一下,不難發現:
- 可以通過 group by 實現快速去重
- 計數操作可以由多個Task分別計數,最終再彙總結果
那麼優化思路就不言而喻了:將去重和計數兩個操作分開,並且用多個Task同時計數,最終再彙總所有Task的計數資料
hive.groupby.skewindata引數
其實 Hive 早就考慮到這個場景,並且貼心的提供了 hive.groupby.skewindata 引數。
當hive.groupby.skewindata = true時,Hive 會將原來的一個 MaReduce 階段轉化成兩個 MapReduce 階段:
- 一階段MapReduce:隨機打散資料,打散後進行區域性聚合(資料去重 + 多Task區域性計數)
- 二階段MapReduce:對一階段的區域性聚合結果進行最終聚合(最終彙總計數)
這樣的描述看起來有點雲裡霧裡,那不妨讓我們自己通過手動優化來更加深入的理解這個引數。手動優化的思路和原理和hive.groupby.skewindata = true是一致的
第一步:資料去重
我們先實現第一步,在每個 province 中,對 user_id 進行去重
SQL很簡單,但有一些需要注意的點:
- 去重效能:group by 的去重效能要比 select distinct 要好,所以使用 group by 去重
- 資料過濾:因為要計算的 uv 指標有條件,所以需要對資料進行過濾
- null值:因為 count(distinc user_id) 不會計算 user_id 為 null 的資料,所以在去重時需要過濾 null 值
那麼我們可以寫出這段SQL
select
province,
user_id,
cast(rand() * 1000 as int) as random_key -- 隨機數,作用稍後解釋~
from
user_behaviour_trace_info
where
access_time = '今天' and age < 20 -- uv統計條件
and user_id is not null -- count(distinc)不統計null值
group by -- 對資料進行去重
province,
user_id
複製程式碼
聰明的同學已經看出來了,這裡除了對資料進行去重外,還多了一個隨機數字段。這個隨機數字段是用來做什麼的呢,繼續往下看你就知道了~
第二步:打散資料,計算區域性聚合結果
資料去重完畢後,只需要統計每個 province 的 user_id 個數就能得到對應 province 的 uv 指標!
由上文提到,group_by_column 決定了資料怎麼分發給 Reducer
同一個 group_by_column 的資料會分配給同一個 Reducer
那麼我們該如何讓多個 Reducer 同時計算某個 province 的 user_id 個數呢?這裡就可以使用去重階段“多出來”的隨機數 random_key!
select
province,
random_key,
sum(1) as partial_uv -- 對 user_id 進行計數,是區域性聚合結果
from (
select -- 子查詢是第一步SQL:資料去重
province,
user_id,
cast(rand() * 1000 as int) as random_key
from
user_behaviour_trace_info
where
access_time = '今天' and age < 20
and user_id is not null
group by
province,
user_id
) t1
group by -- 對隨機數也進行 group by,讓多個 Reducer 一起統計資料
province,
random_key
複製程式碼
使用組合鍵"province + random_key"進行 group by,同一個 province 的資料會隨機分發給多個 Reducer
每個 Reducer 對 user_id 進行計數,獲得區域性聚合結果
任務執行過程如下:
第一步 + 第二步 就相當於是 hive.groupby.skewindata = true 時的一階段Mapreduce
第三步:最終聚合
在第二步中,我們已經將同一個 province 的 user_id 分成多個部分,並且統計出了每個部分的 user_id 數量(partial_uv)
那麼接下來,我們只要對區域性聚合結果進行簡單的相加就可以了
最終SQL如下:
select
province,
sum(partial_aggregation) as uv -- 最終聚合結果就是 count(distinct user_id)
from (
select -- 第二步SQL:打散資料,計算區域性聚合結果
province,
random_key,
sum(1) as partial_uv
from ( -- 第一步SQL:資料去重
select
province,
user_id,
cast(rand() * 1000 as int) as random_key
from
user_behaviour_trace_info
where
access_time = '今天' and age < 20
and user_id is not null
group by
province,
user_id
) t1
group by
province,
random_key
) t2
group by
province
複製程式碼
進階:如何優化多列 count(distinct)
hive.groupby.skewindata 對 count(distinct) 的優化是有限制的,當 hive.groupby.skewindata = true 時,SQL只能對一個列進行 count(distinct),否則會丟擲異常:
Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: DISTINCT on different columns not supported with skew in data
複製程式碼
其實這很容易理解,在剛剛的手動優化過程中,我們能夠很容易發現,這個方法不能同時對多個列進行 去重+計數 得出各自的 count(distinct) 值
主要原因:無法在某一個維度裡,同時對多個列進行去重
count(distinct)優化方案不能直接套用在計算多列的情況上,但可以採用分治的思想,對每個列單獨計算 count(distinct),然後再將結果進行合併
案例
現在有個需求,需要按省份分別去重統計當日 user_id 和 device_id 的去重數量,要求使用者年齡為20歲以下
優化前SQL:
select
province,
count(distinct user_id) as uv,
count(distinct device_id) as dv
from
user_behaviour_trace_info
where
access_time = '今天' and age < 20
group by
province
複製程式碼
優化方案
- 第一步:單獨計算 uv、dv:Job1、Job2
- 第二步:合併計算結果:Job3
值得注意的是,最外層 SELECT 使用 COALESCE() 是因為:在單獨計算某個 count(distinct) 時,可能因為添加了統計條件(年齡小於20歲),而導致 province 沒有對應的取值,left join 時指標為 null
雖然寫起來比原SQL要麻煩些,但效率吊打原SQL不知道多少倍
筆者曾經將一個9小時耗時的任務,通過該方法優化至15分鐘~