1. 程式人生 > 其它 >Hive:select count(distinct)優化以及hive.groupby.skewindata

Hive:select count(distinct)優化以及hive.groupby.skewindata

Hive:select count(distinct)優化以及hive.groupby.skewindata

原文連結:https://juejin.cn/post/6926536667877048333

問題引入

資料分析師小A接到需求,需要統計當日各個省份20歲以下的日活躍使用者數(去重統計user_id,即UV)

現有一個Hive表儲存著使用者行為資料:user_behaviour_trace_info

描述
user_id 使用者id
nickname 暱稱
age 年齡
province 省份
url 訪問地址
access_time 訪問時間
device_id 使用者手機裝置id

小A很順其自然的寫出這段SQL:

select
  province,
  count(distinct user_id) as uv
from
  user_behaviour_trace_info
where
  access_time = '今天' and age < 20
group by
  province
複製程式碼

立馬提交SQL開始執行任務,一頓操作猛如虎,一看時長十點五(小時)

心想不愧是使用者行為資料,資料量居然這麼大?那讓我們看看任務各個Task的執行耗時:

以下三個JobHistory截圖屬於另一個select count(distinct)資料傾斜任務,具有代表意義

我們可以觀察到

  • 任務整體耗時:10小時11分鐘:
  • Map Task 平均耗時:1分16秒
  • Reduce Task 平均耗時:1分59秒

任務執行時間長,MR Task 平均耗時短,極有可能是出現了資料傾斜!

那我們繼續看看Map Task的執行情況,按Map Task耗時倒序排序

Map Task最長耗時為2分49秒,而且整體看起來執行耗時相差不大,問題不在Map階段

接下來看看Reduce階段,按Reduce Task耗時倒序排序

好傢伙,有一個Reduce Task執行了10個小時,另個一執行了近2小時,其餘Reduce Task的執行時間很短。

說好了大家一起幹活,最終卻只有我一個人扛下了所有?

那麼,問題出在哪裡?

我們先要弄明白 Hive 是如何執行這段 SQL 的

Hive SQL 最終要轉化成 MapReducer 任務,在邏輯上可以細分為三個階段:

  1. Map階段:將 group by 欄位作為 key,聚合函式中的列作為 Value,輸出鍵值對
  2. Shuffle階段:對 Map 階段輸出的鍵值對 Key 進行 Hash,按照Hash值將鍵值對傳送至各個 Reducer 中(相同的 Key 會分配給同一個 Reducer)
  3. Reduce階段:執行聚合操作

簡而言之:SQL 中的 Group By 欄位會決定某條資料最終落在哪一個 Reducer 上處理。

下文將 group by 的欄位稱之為 group_by_column

那麼,對於剛剛那段SQL,group_by_column 是 province,同一個 province 的資料會分配給同一個 Reducer,在 Reduce 階段,對 user_id 進行去重統計

然鵝,我國共有34個省級行政區域,一個 Reducer 處理一個省的資料,最多也只能有 34 個 Reducer 同時處理資料

當然,多個省份還可能落在同一個 Reducer 中

如何優雅的解決問題?

我們先來分析一下最初那段SQL的本質:

  1. Map + Shuffle階段:按 province 將資料分發給 Reducer
  2. Reduce階段:對同一個 province 的 user_id 先去重,再計數

Reduce 階段任務最重,執行了去重和計數兩個操作:

  • 去重:在 province 內,對 user_id 去重
  • 計數:統計 province 內 user_id 的個數

SQL任務慢的原因是:同一個 province 的資料全部由同一個 Reducer 處理

思考一下,不難發現:

  1. 可以通過 group by 實現快速去重
  2. 計數操作可以由多個Task分別計數,最終再彙總結果

那麼優化思路就不言而喻了:將去重和計數兩個操作分開,並且用多個Task同時計數,最終再彙總所有Task的計數資料

hive.groupby.skewindata引數

其實 Hive 早就考慮到這個場景,並且貼心的提供了 hive.groupby.skewindata 引數。

當hive.groupby.skewindata = true時,Hive 會將原來的一個 MaReduce 階段轉化成兩個 MapReduce 階段:

  1. 一階段MapReduce:隨機打散資料,打散後進行區域性聚合(資料去重 + 多Task區域性計數)
  2. 二階段MapReduce:對一階段的區域性聚合結果進行最終聚合(最終彙總計數)

這樣的描述看起來有點雲裡霧裡,那不妨讓我們自己通過手動優化來更加深入的理解這個引數。手動優化的思路和原理和hive.groupby.skewindata = true是一致的

第一步:資料去重

我們先實現第一步,在每個 province 中,對 user_id 進行去重

SQL很簡單,但有一些需要注意的點:

  1. 去重效能:group by 的去重效能要比 select distinct 要好,所以使用 group by 去重
  2. 資料過濾:因為要計算的 uv 指標有條件,所以需要對資料進行過濾
  3. null值:因為 count(distinc user_id) 不會計算 user_id 為 null 的資料,所以在去重時需要過濾 null 值

那麼我們可以寫出這段SQL

select
  province,
  user_id,
  cast(rand() * 1000 as int) as random_key -- 隨機數,作用稍後解釋~
from
  user_behaviour_trace_info
where
  access_time = '今天' and age < 20 -- uv統計條件
  and user_id is not null -- count(distinc)不統計null值
group by -- 對資料進行去重
  province,
  user_id
複製程式碼

聰明的同學已經看出來了,這裡除了對資料進行去重外,還多了一個隨機數字段。這個隨機數字段是用來做什麼的呢,繼續往下看你就知道了~

第二步:打散資料,計算區域性聚合結果

資料去重完畢後,只需要統計每個 province 的 user_id 個數就能得到對應 province 的 uv 指標!

由上文提到,group_by_column 決定了資料怎麼分發給 Reducer

同一個 group_by_column 的資料會分配給同一個 Reducer

那麼我們該如何讓多個 Reducer 同時計算某個 province 的 user_id 個數呢?這裡就可以使用去重階段“多出來”的隨機數 random_key!

select
  province,
  random_key,
  sum(1) as partial_uv -- 對 user_id 進行計數,是區域性聚合結果
from (
  select -- 子查詢是第一步SQL:資料去重
    province,
    user_id,
    cast(rand() * 1000 as int) as random_key
  from
    user_behaviour_trace_info
  where
    access_time = '今天' and age < 20
    and user_id is not null
  group by
    province,
    user_id
) t1
group by -- 對隨機數也進行 group by,讓多個 Reducer 一起統計資料
  province,
  random_key
複製程式碼

使用組合鍵"province + random_key"進行 group by,同一個 province 的資料會隨機分發給多個 Reducer

每個 Reducer 對 user_id 進行計數,獲得區域性聚合結果

任務執行過程如下:

第一步 + 第二步 就相當於是 hive.groupby.skewindata = true 時的一階段Mapreduce

第三步:最終聚合

在第二步中,我們已經將同一個 province 的 user_id 分成多個部分,並且統計出了每個部分的 user_id 數量(partial_uv)

那麼接下來,我們只要對區域性聚合結果進行簡單的相加就可以了

最終SQL如下:

select
  province,
  sum(partial_aggregation) as uv -- 最終聚合結果就是 count(distinct user_id)
from (
  select -- 第二步SQL:打散資料,計算區域性聚合結果
    province,
    random_key,
    sum(1) as partial_uv
  from ( -- 第一步SQL:資料去重
    select
      province,
      user_id,
      cast(rand() * 1000 as int) as random_key
    from
      user_behaviour_trace_info
    where
      access_time = '今天' and age < 20
      and user_id is not null
    group by
      province,
      user_id
  ) t1
  group by
    province,
    random_key
) t2
group by
  province
複製程式碼

進階:如何優化多列 count(distinct)

hive.groupby.skewindata 對 count(distinct) 的優化是有限制的,當 hive.groupby.skewindata = true 時,SQL只能對一個列進行 count(distinct),否則會丟擲異常:

Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: DISTINCT on different columns not supported with skew in data
複製程式碼

其實這很容易理解,在剛剛的手動優化過程中,我們能夠很容易發現,這個方法不能同時對多個列進行 去重+計數 得出各自的 count(distinct) 值

主要原因:無法在某一個維度裡,同時對多個列進行去重

count(distinct)優化方案不能直接套用在計算多列的情況上,但可以採用分治的思想,對每個列單獨計算 count(distinct),然後再將結果進行合併

案例

現在有個需求,需要按省份分別去重統計當日 user_id 和 device_id 的去重數量,要求使用者年齡為20歲以下

優化前SQL:

select
  province,
  count(distinct user_id) as uv,
  count(distinct device_id) as dv
from
  user_behaviour_trace_info
where
  access_time = '今天' and age < 20
group by
  province
複製程式碼
優化方案
  • 第一步:單獨計算 uv、dv:Job1、Job2
  • 第二步:合併計算結果:Job3

值得注意的是,最外層 SELECT 使用 COALESCE() 是因為:在單獨計算某個 count(distinct) 時,可能因為添加了統計條件(年齡小於20歲),而導致 province 沒有對應的取值,left join 時指標為 null

雖然寫起來比原SQL要麻煩些,但效率吊打原SQL不知道多少倍

筆者曾經將一個9小時耗時的任務,通過該方法優化至15分鐘~