1. 程式人生 > >Hadoop中的資料傾斜整理

Hadoop中的資料傾斜整理

最近幾次被問到關於資料傾斜的問題,這裡找了些資料也結合一些自己的理解.

     在平行計算中我們總希望分配的每一個task 都能以差不多的粒度來切分並且完成時間相差不大,但是叢集中可能硬體不同,應用的型別不同和切分的資料大小不一致總會導致有部分任務極大的拖慢了整個任務的完成時間,硬體不同就不說了,應用的型別不同其中就比如page rank 或者data mining 裡面一些計算,它的每條記錄消耗的成本不太一樣,這裡只討論關於關係型運算的(一般能用SQL表述的) 資料切分上的資料傾斜問題.

     Hadoop 中資料傾斜會極大影響效能的一個背景是mapreduce 框架中總是不分條件的進行sort . 在通用情況下map sort + partition +reduce sort 可以得到結果,但是這個過程不一定是最優的.  對於關係型計算,其中資料傾斜影響最大的地方在reduce 的sort , reduce 處理的資料量的大小如果超過給定的reduce jvm 的大小的2倍不到的閾值的時候(這個閾值是我猜測的,具體以實際監控執行情況為準),reduce 端會發生multi-pass merge sort 的情況, 這個時候觀察這些執行較慢的reduce task 的metrics 會發現reduce 跟IO 相關的metrics 會比其他reduce 大很多. 具體的細節參考今年

hadoop summit 上Todd 的performance tuning 的ppt (26頁):

這種在reduce 端不分條件的排序只是hadoop 是這種實現,並不是mapreduce 框架一定需要排序,其他的mapreduce 實現或者其他的分散式計算框架可能在reduce 上的這種瓶頸會小一些, 比如shark 裡面的group by 就是基於hash 而不是sort 的.

     對於關係型的計算中常見的資料傾斜有兩種:group by 和 join , 其他有可能的有:

in或exists 的操作尤其是in或exists 作為subquery 的返回(in 或exists 有時候會變成left semi join),

有相同輸入源的union 或union all 也許也會有(其他集合型別的操作intersect 之類也許也是). 

Hive 中的udtf 也算一種.

這裡只討論最常見的group by 和join 的情況.

資料分佈:

     正常的資料分佈理論上都是傾斜的,就是我們所說的20-80原理:80%的財富集中在20%的人手中, 80%的使用者只使用20%的功能 , 20%的使用者貢獻了80%的訪問量 , 不同的資料欄位可能的資料傾斜一般有兩種情況:

一種是唯一值非常少,極少數值有非常多的記錄值(唯一值少於幾千)

一種是唯一值比較多,這個欄位的某些值有遠遠多於其他值的記錄數,但是它的佔比也小於百分之一或千分之一

分割槽:

常見的mapreduce分割槽方式為hash 和range ,

hash partition 的好處是比較彈性,跟資料型別無關,實現簡單(設定reduce個數就好,一般不需要自己實現)

range partition 需要實現者自己瞭解資料分佈, 有時候需要手工做sample取樣. 同時也不夠彈性, 表現在幾個方面,1. 對同一個表的不同欄位都需要實現不同的range partition,  對於時間這種欄位根據查詢型別的不同或者過濾條件的不同切分range 的大小都不一定.

2 .有時候可能設計使用多個欄位組合的情況, 這時候又不能使用之前單個欄位的partition 類, 並且多個欄位組合之間有可能有隱含的聯絡,比如出生日期和星座,商品和季節.

3. 手工做sample 非常耗時間,需要使用者對查詢使用的資料集的分佈有領域知識.

4. 分配方式是死的,reduce 個數是確定的,一旦某種情況下發生傾斜,調整引數

其他的分割槽型別還有Hbase 的hregionpartitioner  或者totalorder partitioner  等.

能夠想到的關於資料傾斜的一些解決方式(歡迎補充,尤其是有沒有做搜尋或者資料探勘的朋友有碰到類似問題):

1. 增加reduce 的jvm記憶體

2. 增加reduce 個數

3. customer partition

4. 其他優化的討論.

5. reduce sort merge排序演算法的討論

6. 正在實現中的hive skewed join.

7. pipeline

8. distinct

9. index 尤其是bitmap index

方式1:既然reduce 本身的計算需要以合適的記憶體作為支援,在硬體環境容許的情況下,增加reduce 的記憶體大小顯然有改善資料傾斜的可能,這種方式尤其適合資料分佈第一種情況,單個值有大量記錄, 這種值的所有紀錄已經超過了分配給reduce 的記憶體,無論你怎麼樣分割槽這種情況都不會改變. 當然這種情況的限制也非常明顯, 1.記憶體的限制存在,2.可能會對叢集其他任務的執行產生不穩定的影響.

方式2:  這個對於資料分佈第二種情況有效,唯一值較多,單個唯一值的記錄數不會超過分配給reduce 的記憶體. 如果發生了偶爾的資料傾斜情況,增加reduce 個數可以緩解偶然情況下的某些reduce 不小心分配了多個較多記錄數的情況. 但是對於第一種資料分佈無效.

方式3: 一種情況是某個領域知識告訴你資料分佈的顯著型別,比如hadoop definitive guide 裡面的溫度問題,一個固定的組合(觀測站點的位置和溫度) 的分佈是固定的, 對於特定的查詢如果前面兩種方式都沒用,實現自己的partitioner 也許是一個好的方式.

方式4: 目前有的一些針對資料傾斜的優化比如pig 的skewed join

pig 文件上面說是根據資料輸入的統計資訊來確定分割槽(也就是range partition?),另外不清楚這個行為是否是動態執行時候才決定的,也就是執行之前有一步pig 自動做sample 的工作,因為pig 是沒有統計資訊這一說的.

hive 中的group by

<property> 
   <name>hive.groupby.skewindata</name> 
   <value>false</value> 
   <description>Whether there is skew in data to optimize group by queries</description> 
</property> 
<property> 
   <name>hive.optimize.groupby</name> 
   <value>true</value> 
   <description>Whether to enable the bucketed group by from bucketed partitions / tables.</description> 
</property>

<property> 
   <name>hive.mapjoin.followby.map.aggr.hash.percentmemory</name> 
   <value>0.3</value> 
   <description>Portion of total memory to be used by map-side grup aggregation hash table, when this group by is followed by map join</description> 
</property> 
<property> 
   <name>hive.groupby.mapaggr.checkinterval</name> 
   <value>100000</value> 
   <description>Number of rows after which size of the grouping keys/aggregation classes is performed</description> 
</property>

其中最後一個引數hive.groupby.mapaggr.checkinterval 的思路跟in-memory combiner 相似, in-memeory combiner  是發生在mapper 端sort 之前,而不是現在的combiner發生在mapper sort 之後甚至在寫入磁碟之後重新讀磁碟然後排序合併. in-memeory combiner 最早好像是《Data-Intensive Text Processing with MapReduce》,mapr 去年的介紹ppt 裡面好像提到它們也有這個優化. mapper 端減少資料的機會比reduce 端的要大,所以一般不會看到reduce 端的combiner 的討論,但是這種思路也有,比如google tenzing 的join 討論裡面有一個prev-next 的小優化就是基於reduce 端的combiner, 但那個前提是基於block shuffle 實現的基礎上,資料已經排過序了,所以join 時候前一條資料跟後一條資料相同的概率很大.

hive 中的skewed join :  之前的文章已經介紹過兩表join 中hive 的幾個優化,其中的skewed join 的類似思路就是上面介紹的skewed 的第二種:增加reduce 的個數,hive 中是通過判斷閾值如果大於一個reduce 需要處理的資料量,重新起額外的task 來處理這些超額的reduce 本身需要處理的資料, 這是一種較晚的補救措施,本身hive 開始分割槽的時候已經傾斜(partition 的方式不合理), 當執行的時候通過執行時監控reduce 發現傾斜的特殊key 然後額外的起task 去處理,效果比較一般,感興趣的同學可以參考HIVE-3086 裡面我和facebook 團隊對這種優化思路的討論. 第六節我會討論一下我所認為的思路和facebook 正在做的思路之間的差別.

方式5 :  reduce 分配的記憶體遠小於處理的資料量時,會產生multi-pass sort 的情況是瓶頸,那麼就要問

1. 這種排序是有必要的嘛?

2. 是否有其他排序演算法或優化可以根據特定情況降低他瓶頸的閾值?

3. map reduce 適合處理這種情況嘛?

關於問題1. 如果是group by , 那麼對於資料分佈情況1 ,hash 比sort 好非常多,即使某一個reduce 比其他reduce 處理多的多的資料,hash 的計算方式也不會差距太大.

問題2. 一個是如果實現block shuffle 肯定會極大的減少排序本身的成本, 另外,如果分割槽之後的reduce 不是使用copy –> sort-merge –> reduce 的計算方式, 在copy 之後將每個block 的頭部資訊儲存在記憶體中,不用sort – merge 也可以直接計算reduce, 只不過這時候變成了隨機訪問,而不是現在的sort-merge 之後的順序訪問. block shuffle 的實現有兩種型別,一種是當hadoop 中真正有了列資料格式的時候,資料有更大的機會已經排過序並且按照block 來切分,一般block 為1M ( 可以關注avro-806 )  , 這時候的mapper 什麼都不做,甚至連計算分割槽的開銷都小了很多倍,直接進入reduce 最後一步,第二種型別為沒有列資料格式的支援,需要mapper 排序得到之後的block 的最大最小值,reduce 端在記憶體中儲存最大最小值,copy  完成後直接用這個值來做隨機讀然後進行reduce. ( block shuffle  的實現可以關注 MAPREDUCE-4039 , hash 計算可以關注 MAPREDUCE-1639)

問題3 . map reduce 只有兩個函式,一個map 一個 reduce, 一旦發生資料傾斜就是partition 失效了,對於join 的例子,某一個key 分配了過多的記錄數,對於只有一次partittion的機會,分配錯了資料傾斜的傷害就已經造成了,這種情況很難除錯,但是如果你是基於map-reduce-reduce 的方式計算,那麼對於同一個key 不需要分配到同一個reduce 中,在第一個reduce 中得到的結果可以在第二個reduce 才彙總去重,第二個reduce 不需要sort – merge 的步驟,因為前一個reduce 已經排過序了,中間的reduce 處理的資料不用關心partition 怎麼分,處理的資料量都是一樣大,而第二個reduce 又不使用sort-merge 來排序,不會遇到現在的記憶體大小的問題,對於skewed join 這種情況瓶頸自然小很多.

方式6:  目前hive 有幾個正在開發中的處理skewed join 情況的jira case,  HIVE-3086 , HIVE-3286 ,HIVE-3026 . 簡單介紹一下就是facebook 希望通過手工處理提前列舉的方式列出單個傾斜的值,在join 的時候將這些值特殊列出當作map join 來處理,對於其他值使用原來的方式. 我個人覺得這太不伸縮了,值本身沒有考慮應用過濾條件和優化方式之後的資料量大小問題,他們提前列出的值都是基於整個分割槽的. join key 如果為組合key 的情況也應該沒有考慮,對metastore 的儲存問題有限制,對輸入的大表和小表都會scan 兩次( 一次處理非skew key , 一次處理skew key 做map join), 對輸出表也會scan 兩次(將兩個結果進行merge) , skew key 必須提前手工列出這又存在額外維護的成本,目前因為還沒有完整的開發完到能夠投入生產的情況,所以等所有特性處理完了有了文件在看看這個處理方式是否有效,我個人認為的思路應該是接著bucked map join 的思路往下走,只不過不用提前處理cluster key 的問題, 這時候cluster key 的選擇應該是join key + 某個能分散join key 的列, 這等於將大表的同一個key 的值分散到了多個不同的reduce 中,而小表的join key 也必須cluster 到跟大表對應的同一個key , join 中對於資料分佈第二種情況不用太難,增加reduce 個數就好,主要是第一種,需要大表的join key 能夠分散,對於同樣join key 的小表又能夠匹配到所有大表中的記錄. 這種思路就是不用掃描大表兩遍或者結果輸出表,不需要提前手工處理,資料是動態sample 的應用了過濾條件之後的資料,而不是提前基於統計資料的不準確結果. 這個基本思路跟tenzing 裡面描述的distributed hash join 是一樣的,想辦法切成合適的大小然後用hash 和 map join .

方式7: 當同時出現join 和group 的時候, 那麼這兩個操作應該是以pipeline (管道) 的方式執行. 在join 的時候就可以直接使用group 的操作符減少大量的資料,而不是等待join 完成,然後寫入磁碟,group 又讀取磁碟做group操作. HIVE-2206 正在做這個優化. hive 裡面是沒有pipeline 這個概念的. 像是cloudera 的crunch 或者twitter 的Scalding 都是有這種概念的.

方式8: distinct 本身就是group by 的一種簡寫,我原先以為count(distinct x)這種跟group by 是一樣的,但是發現hive 裡面distinct 明顯比group by 要慢,可能跟group by 會有map 端的combiner有關, 另外觀察到hive 在預估count(distinct x) 的reduce 個數比group by 的個數要少 , 所以hive 中使用count(distinct x) , 要麼儘量把reduce 個數設定大,直接設定reduce 個數或者hive.exec.reducers.bytes.per.reducer 調小,我個人比較喜歡調後面一個,hive 目前的reduce 個數沒有統計資訊的情況下就是用map端輸入之前的數值, 如果你是join 之後還用count(distinct x) 的話,這個預設值一般都會悲劇,如果有where 條件並能過濾一定數量的資料,那麼預設reduce 個數可能就還好一點. 不管怎樣,多浪費一點reduce slot 總比等十幾甚至幾十分鐘要好, 或者轉換成group by 的寫法也不錯,寫成group by 的時候distributed by 也很有幫助.

方式9: hive 中的index 就是物化檢視,對於group by 和distinct 的情況等於變成了map 端在做計算,自然不存在傾斜. 尤其是bitmap index , 對於唯一值比較少的列優勢更大,不過index 麻煩的地方在於需要判斷你的sql 是不是常用sql , 另外如果create index 的時候沒有選你查詢的時候用的欄位,這個index 是不能用的( hive 中是永遠不可能有DBMS中的用index 去lookup 或者join 原始表這種概念的)

其他建議:

網上能找到的另外一份很好的描述資料傾斜的資料是

裡面的map side skew 和expensive record 都不是關係型計算中的問題,所以不是這篇文章關注點. 對於關係型計算,其中資料傾斜影響最大的地方在reduce 的sort. 這篇文章裡面最後總結的5點好的建議值得參考,

其中第三條需要你知道應用combiner 和特殊優化方式是否帶來了效能的提升,hive 的map aggr 在資料分佈情況1效果會比較好,資料分佈情況2效果就不大,還有combiner 應用的時候是消耗了系統資源的,確認這種消耗是否值得而不是任何情況下都使用combiner. 

對於第四點關係型計算中map 傾斜情況不太常見. 一種可以舉出來的例子是分割槽不合理,或者hive 中的cluster by 的key 選擇不合理(都是使用目錄的方式分割槽, 目錄是最小處理單元了).

Use domain knowledge when choosing the 
map output partitioning scheme if the reduce operation is 
expensive: Range partition or some other form of explicit 
partition may be better than the default hash-partition
Try different partitioning schemes on sample 
workloads or collect the data distribution at the reduce input 
if a MapReduce job is expected to run several times
Implement a combiner to reduce the amount 
of data going into the reduce-phase and, as such, significantly 
dampen the effects of any type of reduce-skew
Use a pre-processing MapReduce job that 
extracts properties of the input data in the case of a longruning, 
skew-prone map phase. Appropriately partitioning the 
data before the real application runs can significantly reduce 
skew problems in the map phase.
Best Practice 5. Design algorithms whose runtime depends 
only on the amount of input data and not the data distribution.

另外一份是淘寶的資料傾斜總結:

不過我個人覺得幫助不是太大,裡面第一個解決方式空值產生的影響第一個Union All 的方式個人是極力反對的,同一個表尤其是大表掃描兩遍這額外的成本跟收益太不匹配,不推薦,第二個將特殊值變成random 的方式, 這個產生的結果是正確的嘛? 尤其是在各種情況下輸出結果是正確的嘛?裡面背景好像是那個小表users 的主鍵為userid, 然後userid 又是join key , 而且還不為空? 不太推薦,背景條件和輸出的正確性與否存疑.

第二個資料型別不同的問題我覺得跟HIVE-3445 都算是資料建模的問題,提前修改好是一樣的.

第三個是因為淘寶的hadoop 版本中沒有map side hash aggr 的引數吧. 而且寫成distinct 還多了一個MR 步驟,不太推薦.

資料傾斜在MPP 中也是一個課題,這也設計到一個數據重分配的問題,但是相對於MPP 中有比較成熟的機制,一個是mpp 在處理資料初始分佈的時候總是會指定segmented by 或者distributed by 這種顯示分配到不同物理機器上的建表語句. 還有就是統計資訊會幫助執行引擎選擇合適的重新分佈.但是統計資訊也不是萬能的,比如

1:統計資訊的粒度和更新問題.

2: 應用了過濾條件之後的資料也許不符合原始期望的資料分佈.

3: 統計資訊是基於取樣的,總於真實所有資料存在誤差.

4: 統計資訊是基於partittion 的, 對於查詢沒有涉及到partition 欄位的切分就不能使用各partition 只和來表示總體的統計資訊.

5. 臨時表或者多步驟查詢的中間過程資料沒有統計資訊的情況.

6. 各種其他的演算法優化比如in-mapper combiner 或者google Tenzing 的prev – next combine 都會影響統計資訊對於演算法選擇的不同.

總結:

資料傾斜沒有一勞永逸的方式可以解決,瞭解你的資料集的分佈情況,然後瞭解你所使用計算框架的執行機制和瓶頸,針對特定的情況做特定的優化,做多種嘗試,觀察是否有效.