如何解決spark中的資料傾斜問題
發現數據傾斜的時候,不要急於提高executor的資源,修改引數或是修改程式,首先要檢查資料本身,是否存在異常資料。
-
1、資料問題造成的資料傾斜
-
找出異常的key
-
如果任務長時間卡在最後最後1個(幾個)任務,首先要對key進行抽樣分析,判斷是哪些key造成的。選取key,對資料進行抽樣,統計出現的次數,根據出現次數大小排序取出前幾個。
-
比如: df.select("key").sample(false,0.1).(k=>(k,1)).reduceBykey(+).map(k=>(k.2,k.1)).sortByKey(false).take(10)
-
如果發現多數資料分佈都較為平均,而個別資料比其他資料大上若干個數量級,則說明發生了資料傾斜。
-
-
經過分析,傾斜的資料主要有以下三種情況:
-
1、null(空值)或是一些無意義的資訊()之類的,大多是這個原因引起。
-
2、無效資料,大量重複的測試資料或是對結果影響不大的有效資料。
-
3、有效資料,業務導致的正常資料分佈。
-
-
解決辦法
-
第1,2種情況,直接對資料進行過濾即可(因為該資料對當前業務不會產生影響)。
-
第3種情況則需要進行一些特殊操作,常見的有以下幾種做法
-
(1) 隔離執行,將異常的key過濾出來單獨處理,最後與正常資料的處理結果進行union操作。
-
(2) 對key先新增隨機值,進行操作後,去掉隨機值,再進行一次操作。
-
(3) 使用reduceByKey 代替 groupByKey(reduceByKey用於對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,並且merge操作可以通過函式自定義.)
-
(4) 使用map join。
-
-
-
案例
-
如果使用reduceByKey因為資料傾斜造成執行失敗的問題。具體操作流程如下:
-
(1) 將原始的 key 轉化為 key + 隨機值(例如Random.nextInt)
-
(2) 對資料進行 reduceByKey(func)
-
(3) 將 key + 隨機值 轉成 key
-
(4) 再對資料進行 reduceByKey(func)
-
-
-
案例操作流程分析:
-
假設說有傾斜的Key,我們給所有的Key加上一個隨機數,然後進行reduceByKey操作;此時同一個Key會有不同的隨機數字首,在進行reduceByKey操作的時候原來的一個非常大的傾斜的Key就分而治之變成若干個更小的Key,不過此時結果和原來不一樣,怎麼破?進行map操作,目的是把隨機數字首去掉,然後再次進行reduceByKey操作。(當然,如果你很無聊,可以再次做隨機數字首),這樣我們就可以把原本傾斜的Key通過分而治之方案分散開來,最後又進行了全域性聚合
-
注意1: 如果此時依舊存在問題,建議篩選出傾斜的資料單獨處理。最後將這份資料與正常的資料進行union即可。
-
注意2: 單獨處理異常資料時,可以配合使用Map Join解決。
-
-
-
2、spark使用不當造成的資料傾斜
-
提高shuffle並行度
-
dataFrame和sparkSql可以設定spark.sql.shuffle.partitions引數控制shuffle的併發度,預設為200。
-
rdd操作可以設定spark.default.parallelism控制併發度,預設引數由不同的Cluster Manager控制。
-
侷限性: 只是讓每個task執行更少的不同的key。無法解決個別key特別大的情況造成的傾斜,如果某些key的大小非常大,即使一個task單獨執行它,也會受到資料傾斜的困擾。
-
使用map join 代替reduce join
-
侷限性: 因為是先將小資料傳送到每個executor上,所以資料量不能太大。
-
-