1. 程式人生 > >如何解決spark中的資料傾斜問題

如何解決spark中的資料傾斜問題

發現數據傾斜的時候,不要急於提高executor的資源,修改引數或是修改程式,首先要檢查資料本身,是否存在異常資料。

  • 1、資料問題造成的資料傾斜

    • 找出異常的key

      • 如果任務長時間卡在最後最後1個(幾個)任務,首先要對key進行抽樣分析,判斷是哪些key造成的。選取key,對資料進行抽樣,統計出現的次數,根據出現次數大小排序取出前幾個。

      • 比如: df.select("key").sample(false,0.1).(k=>(k,1)).reduceBykey(+).map(k=>(k.2,k.1)).sortByKey(false).take(10)

      • 如果發現多數資料分佈都較為平均,而個別資料比其他資料大上若干個數量級,則說明發生了資料傾斜。

    • 經過分析,傾斜的資料主要有以下三種情況:

      • 1、null(空值)或是一些無意義的資訊()之類的,大多是這個原因引起。

      • 2、無效資料,大量重複的測試資料或是對結果影響不大的有效資料。

      • 3、有效資料,業務導致的正常資料分佈。

    • 解決辦法

      • 第1,2種情況,直接對資料進行過濾即可(因為該資料對當前業務不會產生影響)。

      • 第3種情況則需要進行一些特殊操作,常見的有以下幾種做法

        • (1) 隔離執行,將異常的key過濾出來單獨處理,最後與正常資料的處理結果進行union操作。

        • (2) 對key先新增隨機值,進行操作後,去掉隨機值,再進行一次操作。

        • (3) 使用reduceByKey 代替 groupByKey(reduceByKey用於對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,並且merge操作可以通過函式自定義.)

        • (4) 使用map join。

    • 案例

      • 如果使用reduceByKey因為資料傾斜造成執行失敗的問題。具體操作流程如下:

        • (1) 將原始的 key 轉化為 key + 隨機值(例如Random.nextInt)

        • (2) 對資料進行 reduceByKey(func)

        • (3) 將 key + 隨機值 轉成 key

        • (4) 再對資料進行 reduceByKey(func)

    • 案例操作流程分析:

      • 假設說有傾斜的Key,我們給所有的Key加上一個隨機數,然後進行reduceByKey操作;此時同一個Key會有不同的隨機數字首,在進行reduceByKey操作的時候原來的一個非常大的傾斜的Key就分而治之變成若干個更小的Key,不過此時結果和原來不一樣,怎麼破?進行map操作,目的是把隨機數字首去掉,然後再次進行reduceByKey操作。(當然,如果你很無聊,可以再次做隨機數字首),這樣我們就可以把原本傾斜的Key通過分而治之方案分散開來,最後又進行了全域性聚合

      • 注意1: 如果此時依舊存在問題,建議篩選出傾斜的資料單獨處理。最後將這份資料與正常的資料進行union即可。

      • 注意2: 單獨處理異常資料時,可以配合使用Map Join解決。

  • 2、spark使用不當造成的資料傾斜

    • 提高shuffle並行度

      • dataFrame和sparkSql可以設定spark.sql.shuffle.partitions引數控制shuffle的併發度,預設為200。

      • rdd操作可以設定spark.default.parallelism控制併發度,預設引數由不同的Cluster Manager控制。

      • 侷限性: 只是讓每個task執行更少的不同的key。無法解決個別key特別大的情況造成的傾斜,如果某些key的大小非常大,即使一個task單獨執行它,也會受到資料傾斜的困擾。

      • 使用map join 代替reduce join

      • 侷限性: 因為是先將小資料傳送到每個executor上,所以資料量不能太大。