1. 程式人生 > >spark完整的數據傾斜解決方案

spark完整的數據傾斜解決方案

date 怎麽 數據傾斜 xtra 直接 agg com 之前 ima

1、數據傾斜的原理

2、數據傾斜的現象

3、數據傾斜的產生原因與定位

技術分享圖片

在執行shuffle操作的時候,大家都知道,我們之前講解過shuffle的原理。

是按照key,來進行values的數據的輸出、拉取和聚合的。 同一個key的values,一定是分配到一個reduce task進行處理的。 多個key對應的values,總共是90萬。

但是問題是,可能某個key對應了88萬數據,key-88萬values,分配到一個task上去面去執行。 另外兩個task,可能各分配到了1萬數據,可能是數百個key,對應的1萬條數據。

想象一下,出現數據傾斜以後的運行的情況。很糟糕!極其糟糕!無比糟糕!

第一個和第二個task,各分配到了1萬數據;那麽可能1萬條數據,需要10分鐘計算完畢;第一個和第二個task,可能同時在10分鐘內都運行完了;第三個task要88萬條,88 * 10 = 880分鐘 = 14.5個小時;

大家看看,本來另外兩個task很快就運行完畢了(10分鐘),但是由於一個拖後腿的家夥,第三個task,要14.5個小時才能運行完,就導致整個spark作業,也得14.5個小時才能運行完。

導致spark作業,跑的特別特別特別特別慢!!!像老牛拉破車! 數據傾斜,一旦出現,是不是性能殺手。。。。

發生數據傾斜以後的現象:

spark數據傾斜,有兩種表現:

1、你的大部分的task,都執行的特別特別快,刷刷刷,就執行完了(你要用client模式,standalone client,yarn client,本地機器主要一執行spark-submit腳本,就會開始打印log),task175 finished;剩下幾個task,執行的特別特別慢,前面的task,一般1s可以執行完5個;最後發現1000個task,998,999 task,要執行1個小時,2個小時才能執行完一個task。 出現數據傾斜了 還算好的,因為雖然老牛拉破車一樣,非常慢,但是至少還能跑。

2、運行的時候,其他task都刷刷刷執行完了,也沒什麽特別的問題;但是有的task,就是會突然間,啪,報了一個OOM,JVM Out Of Memory,內存溢出了,task failed,task lost,resubmitting task。反復執行幾次都到了某個task就是跑不通,最後就掛掉。 某個task就直接OOM,那麽基本上也是因為數據傾斜了,task分配的數量實在是太大了!!!所以內存放不下,然後你的task每處理一條數據,還要創建大量的對象。內存爆掉了。 出現數據傾斜了 這種就不太好了,因為你的程序如果不去解決數據傾斜的問題,壓根兒就跑不出來。 作業都跑不完,還談什麽性能調優這些東西。扯淡。。。

定位原因與出現問題的位置:

根據log去定位 出現數據傾斜的原因,基本只可能是因為發生了shuffle操作,在shuffle的過程中,出現了數據傾斜的問題。因為某個,或者某些key對應的數據,遠遠的高於其他的key。

1、你在自己的程序裏面找找,哪些地方用了會產生shuffle的算子,groupByKey、countByKey、reduceByKey、join

2、看log log一般會報是在你的哪一行代碼,導致了OOM異常;或者呢,看log,看看是執行到了第幾個stage!!! 我們這裏不會去剖析stage的劃分算法,spark代碼,是怎麽劃分成一個一個的stage的。哪一個stage,task特別慢,就能夠自己用肉眼去對你的spark代碼進行stage的劃分,就能夠通過stage定位到你的代碼,哪裏發生了數據傾斜 去找找,代碼那個地方,是哪個shuffle操作。

=================================================================================================================================================

數據傾斜的解決,跟之前講解的性能調優,有一點異曲同工之妙。 性能調優,跟大家講過一個道理,“重劍無鋒”。性能調優,調了半天,最有效,最直接,最簡單的方式,就是加資源,加並行度,註意RDD架構(復用同一個RDD,加上cache緩存);shuffle、jvm等,次要的。 數據傾斜,解決方案,第一個方案和第二個方案,一起來講。最樸素、最簡譜、最直接、最有效、最簡單的,解決數據傾斜問題的方案。 第一個方案:聚合源數據 第二個方案:過濾導致傾斜的key 重劍無鋒。後面的五個方案,尤其是最後4個方案,都是那種特別炫酷的方案。雙重group聚合方案;sample抽樣分解聚合方案;如果碰到了數據傾斜的問題。上來就先考慮考慮第一個和第二個方案,能不能做,如果能做的話,後面的5個方案,都不用去搞了。 有效。簡單。直接。效果是非常之好的。徹底根除了數據傾斜的問題。

第一個方案:聚合源數據 咱們現在,做一些聚合的操作,groupByKey、reduceByKey;groupByKey,說白了,就是拿到每個key對應的values;reduceByKey,說白了,就是對每個key對應的values執行一定的計算。 現在這些操作,比如groupByKey和reduceByKey,包括之前說的join。都是在spark作業中執行的。 spark作業的數據來源,通常是哪裏呢?90%的情況下,數據來源都是hive表(hdfs,大數據分布式存儲系統)。hdfs上存儲的大數據。hive表,hive表中的數據,通常是怎麽出來的呢?有了spark以後,hive比較適合做什麽事情?hive就是適合做離線的,晚上淩晨跑的,ETL(extract transform load,數據的采集、清洗、導入),hive sql,去做這些事情,從而去形成一個完整的hive中的數據倉庫;說白了,數據倉庫,就是一堆表。 spark作業的源表,hive表,其實通常情況下來說,也是通過某些hive etl生成的。hive etl可能是晚上淩晨在那兒跑。今天跑昨天的數九。 數據傾斜,某個key對應的80萬數據,某些key對應幾百條,某些key對應幾十條;現在,咱們直接在生成hive表的hive etl中,對數據進行聚合。比如按key來分組,將key對應的所有的values,全部用一種特殊的格式,拼接到一個字符串裏面去,比如“key=sessionid, value: action_seq=1|user_id=1|search_keyword=火鍋|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”。

對key進行group,在spark中,拿到key=sessionid,values<Iterable>;hive etl中,直接對key進行了聚合。那麽也就意味著,每個key就只對應一條數據。在spark中,就不需要再去執行groupByKey+map這種操作了。直接對每個key對應的values字符串,map操作,進行你需要的操作即可。key,values串。 spark中,可能對這個操作,就不需要執行shffule操作了,也就根本不可能導致數據傾斜。 或者是,對每個key在hive etl中進行聚合,對所有values聚合一下,不一定是拼接起來,可能是直接進行計算。reduceByKey,計算函數,應用在hive etl中,每個key的values。

聚合源數據方案,第二種做法 你可能沒有辦法對每個key,就聚合出來一條數據; 那麽也可以做一個妥協;對每個key對應的數據,10萬條;有好幾個粒度,比如10萬條裏面包含了幾個城市、幾天、幾個地區的數據,現在放粗粒度;直接就按照城市粒度,做一下聚合,幾個城市,幾天、幾個地區粒度的數據,都給聚合起來。比如說 city_id date area_id select ... from ... group by city_id 盡量去聚合,減少每個key對應的數量,也許聚合到比較粗的粒度之後,原先有10萬數據量的key,現在只有1萬數據量。減輕數據傾斜的現象和問題。

上面講的第一種方案,其實這裏沒法講的太具體和仔細;只能給一個思路。但是我覺得,思路已經講的非常清晰了;一般來說,大家只要有一些大數據(hive)。經驗,我覺得都是可以理解的。 具體怎麽去在hive etl中聚合和操作,就得根據你碰到數據傾斜問題的時候,你的spark作業的源hive表的具體情況,具體需求,具體功能,具體分析。

對於我們的程序來說,完全可以將aggregateBySession()這一步操作,放在一個hive etl中來做,形成一個新的表。對每天的用戶訪問行為數據,都按session粒度進行聚合,寫一個hive sql。 在spark程序中,就不要去做groupByKey+mapToPair這種算子了。直接從當天的session聚合表中,用Spark SQL查詢出來對應的數據,即可。這個RDD在後面就可以使用了。

第二個方案:過濾導致傾斜的key 如果你能夠接受某些數據,在spark作業中直接就摒棄掉,不使用。比如說,總共有100萬個key。只有2個key,是數據量達到10萬的。其他所有的key,對應的數量都是幾十。 這個時候,你自己可以去取舍,如果業務和需求可以理解和接受的話,在你從hive表查詢源數據的時候,直接在sql中用where條件,過濾掉某幾個key。 那麽這幾個原先有大量數據,會導致數據傾斜的key,被過濾掉之後,那麽在你的spark作業中,自然就不會發生數據傾斜了。

spark完整的數據傾斜解決方案