1. 程式人生 > >【Spark調優】小表join大表數據傾斜解決方案

【Spark調優】小表join大表數據傾斜解決方案

htm strong 函數 ref 通過 拉取 使用 就是 logs

【使用場景】  

  對RDD使用join類操作,或者是在Spark SQL中使用join語句時,而且join操作中的一個RDD或表的數據量比較小(例如幾百MB或者1~2GB),比較適用此方案。

【解決方案】

  小表join大表轉為小表broadcast+map大表實現。具體為:

  普通的join是會shuffle的,而一旦shuffle,就相當於會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join,此時如果發生數據傾斜,影響處理性能,而此時恰好一個RDD是比較小的,則可以采用廣播小RDD全量數據+map算子來實現與join同樣的效果,也就是map join,因為這樣不會發生shuffle,也就不會發生數據傾斜。

  也就是說,不使用join算子進行連接操作,而使用Broadcast變量與map類算子實現join操作,進而完全規避掉shuffle類的操作,徹底避免數據傾斜的發生和出現。將較小RDD中的數據直接通過collect算子拉取到Driver端的內存中來,然後對其創建一個Broadcast變量;接著對另外一個RDD執行map類算子,在算子函數內,從Broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照連接key進行比對,如果連接key相同的話,那麽就將兩個RDD的數據需要的方式連接起來。

【方案優點】

  join操作導致的數據傾斜,效果非常好,因為根本就不會發生shuffle,也就根本不會發生數據傾斜,是一種治標治本的解決方案。

【方案局限

  這個方案只適用於一個大表和一個小表join的情況。因為解決方案是需要將小表進行廣播,此時會比較消耗內存資源driver和每個Executor內存中都會駐留一份小RDD的全量數據。如果廣播出去的RDD數據比較大,比如10G以上,那麽就可能發生內存溢出了。因此並不適合兩個都是大表的情況。

【代碼實現】

  我對上述方案做了代碼實現,見我的github:https://github.com/wwcom614/Spark

  Java版實現

  Scala版實現

  下一篇:

  上一篇:【Spark調優】聚合操作數據傾斜解決方案

【Spark調優】小表join大表數據傾斜解決方案