解決spark中遇到的資料傾斜問題
一. 資料傾斜的現象
多數task執行速度較快,少數task執行時間非常長,或者等待很長時間後提示你記憶體不足,執行失敗。
二. 資料傾斜的原因
常見於各種shuffle操作,例如reduceByKey,groupByKey,join等操作。
資料問題
- key本身分佈不均勻(包括大量的key為空)
- key的設定不合理
spark使用問題
- shuffle時的併發度不夠
- 計算方式有誤
三. 資料傾斜的後果
- spark中一個stage的執行時間受限於最後那個執行完的task,因此執行緩慢的任務會拖累整個程式的執行速度(分散式程式執行的速度是由最慢的那個task決定的)。
- 過多的資料在同一個task中執行,將會把executor撐爆,造成OOM,程式終止執行。
一個理想的分散式程式:
發生資料傾斜時,任務的執行速度由最大的那個任務決定:
四. 資料問題造成的資料傾斜
發現數據傾斜的時候,不要急於提高executor的資源,修改引數或是修改程式,首先要檢查資料本身,是否存在異常資料。
找出異常的key
如果任務長時間卡在最後最後1個(幾個)任務,首先要對key進行抽樣分析,判斷是哪些key造成的。
選取key,對資料進行抽樣,統計出現的次數,根據出現次數大小排序取出前幾個
df.select("key").sample(false,0.1).(k=>(k,1)).reduceBykey(_+_).map(k=>(k._2,k._1)).sortByKey(false).take(10)
如果發現多數資料分佈都較為平均,而個別資料比其他資料大上若干個數量級,則說明發生了資料傾斜。
經過分析,傾斜的資料主要有以下三種情況:
- null(空值)或是一些無意義的資訊()之類的,大多是這個原因引起。
- 無效資料,大量重複的測試資料或是對結果影響不大的有效資料。
- 有效資料,業務導致的正常資料分佈。
解決辦法
第1,2種情況,直接對資料進行過濾即可。
第3種情況則需要進行一些特殊操作,常見的有以下幾種做法。
- 隔離執行,將異常的key過濾出來單獨處理,最後與正常資料的處理結果進行union操作。
- 對key先新增隨機值,進行操作後,去掉隨機值,再進行一次操作。
- 使用
reduceByKey
groupByKey
- 使用map join。
舉例:
如果使用reduceByKey
因為資料傾斜造成執行失敗的問題。具體操作如下:
- 將原始的
key
轉化為key + 隨機值
(例如Random.nextInt) - 對資料進行
reduceByKey(func)
- 將
key + 隨機值
轉成key
- 再對資料進行
reduceByKey(func)
tip1: 如果此時依舊存在問題,建議篩選出傾斜的資料單獨處理。最後將這份資料與正常的資料進行union即可。
tips2: 單獨處理異常資料時,可以配合使用Map Join解決。
五. spark使用不當造成的資料傾斜
1. 提高shuffle並行度
dataFrame
和sparkSql
可以設定spark.sql.shuffle.partitions
引數控制shuffle的併發度,預設為200。
rdd操作可以設定spark.default.parallelism
控制併發度,預設引數由不同的Cluster Manager控制。
侷限性: 只是讓每個task執行更少的不同的key。無法解決個別key特別大的情況造成的傾斜,如果某些key的大小非常大,即使一個task單獨執行它,也會受到資料傾斜的困擾。
2. 使用map join 代替reduce join
在小表不是特別大(取決於你的executor大小)的情況下使用,可以使程式避免shuffle的過程,自然也就沒有資料傾斜的困擾了。
侷限性: 因為是先將小資料傳送到每個executor上,所以資料量不能太大。
具體使用方法和處理流程參照:
相關推薦
如何解決spark中的資料傾斜問題
發現數據傾斜的時候,不要急於提高executor的資源,修改引數或是修改程式,首先要檢查資料本身,是否存在異常資料。 1、資料問題造成的資料傾斜 找出異常的key 如果任務長時間卡在最後最後1個(幾個)任務,首先要對key進行抽樣分析,判斷是哪些
解決spark中遇到的資料傾斜問題
一. 資料傾斜的現象 多數task執行速度較快,少數task執行時間非常長,或者等待很長時間後提示你記憶體不足,執行失敗。 二. 資料傾斜的原因 常見於各種shuffle操作,例如reduceByKey,groupByKey,join等操作。 資
Spark專案實戰-資料傾斜解決方案之原理以及現象分析
一、資料傾斜的原理 在執行shuffle操作的時候,大家都知道是按照key來進行values的資料的輸出、拉取和聚合的。同一個key的values,一定是分配到一個reduce task進行處理的。假設多個key對應的values,總共是90萬。但是問題是可能某個key對應
Spark專案實戰-資料傾斜解決方案之將reduce join轉換為map join
一、reduce端join操作原理 二、map端join操作原理 三、適用場景 如果兩個RDD要進行join,其中一個RDD是比較小的。一個RDD是100萬資料,一個RDD是1萬資料。(一個RDD是1億資料,一個RDD是100萬資料) 其中一個RDD必須是比較
如何呼叫Spark中的資料標準化庫
在大資料的學習過程中,總有很多小夥伴遇到不知如何呼叫Spark中的資料標準庫,本文的核心這不是在於介紹「資料標準化」,也不是在於實現「Spark呼叫」,畢竟這些概念大家應該耳濡目染了,至於呼叫方法一搜一大堆。今天這個問題也是科多大資料的一名學員提出來的,估計有很多人也遇到類似的問題,一併分享在此,希
Spark中的資料本地性
分散式資料並行環境下,保持資料的本地性是非常重要的內容,事關分散式系統性能高下。 概念: block : HDFS的物理空間概念,固定大小,最小是64M,可以是128,256 。。也就是說單個檔案大於block的大小,肯定會被切分,被切分的數目大概是:比如檔案是250
Hadoop中的資料傾斜整理
最近幾次被問到關於資料傾斜的問題,這裡找了些資料也結合一些自己的理解. 在平行計算中我們總希望分配的每一個task 都能以差不多的粒度來切分並且完成時間相差不大,但是叢集中可能硬體不同,應用的型別不同和切分的資料大小不一致總會導致有部分任務極大的拖慢了整個任務的
資料基礎---spark中的資料型別
mllib中的資料型別 本文是對官方文件的翻譯整理 1、資料型別 Local vector(本地向量) Labeled point(帶標籤資料點) Local matrix(本地矩陣) Distrubuted matrix(分散式矩陣):RowM
map-reduce階段中的資料傾斜問題
MapReduce資料傾斜: mapreduce處理過程有一個特點,相同的key,只能是發給同一個reduce進行處理。 原因:hadoop原始碼中有一行程式碼,(key.hashcode())%numReduce,先把key進行hash然後除以reduce
完美 解決fragment中listview資料丟失問題和問題分析
public class MainpageFind extends Fragment { private ListViewForScrollView mListview; private FindShareAdatpter shareAdatpter = null; privat
Spark:對資料傾斜的八種處理方法
目錄 1. 什麼是資料傾斜 資料傾斜是一種很常見的問題(依據二八定律),簡單來說,比方WordCount中某個Key對應的資料量非常大的話,就會產生資料傾斜,導致兩個後果: OOM(單或少數的節點); 拖慢整個Job
大資料之Spark(二)--- RDD,RDD變換,RDD的Action,解決spark的資料傾斜問題,spark整合hadoop的HA
一、Spark叢集執行 ------------------------------------------------------- 1.local //本地模式 2.standalone //獨立模式 3.yarn //yarn模式
spark資料傾斜分析與解決方案
Spark資料傾斜(資料分佈不均勻) 資料傾斜發生時的現象: 絕大多數task(任務)執行得都非常快,但個別task執行極慢。 OOM(記憶體溢位),這種情況比較少見。 資料傾斜發生的原理 資料傾斜的原理很簡單:在進行shuffle的時候,必須將各個節點上相同的k
Spark 執行時常見異常及資料傾斜的解決方法
spark執行異常: 現象1: 有時會出現的一種情況非常普遍,在spark的作業中;shuffle file not found。(spark作業中,非常非常常見的)而且,有的時候,它是偶爾才會出現的一種情況。有的時候,出現這種情況以後,會重新去
spark 大型專案實戰(五十八):資料傾斜解決方案之sample取樣傾斜key進行兩次join
當採用隨機數和擴容表進行join解決資料傾斜的時候,就代表著,你的之前的資料傾斜的解決方案,都沒法使用。 這個方案是沒辦法徹底解決資料傾斜的,更多的,是一種對資料傾斜的緩解。 原理,其實在上一講,已經帶出來了。 步驟: 1、選擇一個RDD,要用flatM
Spark資料傾斜及解決方案
一.場景 1.絕大多數task執行得都非常快,但個別task執行極慢。比如,總共有100個task,97個task都在1s之內執行完了,但是剩餘的task卻要一兩分鐘。這種情況很常見。 2.原本能夠正常執行的Spark作業,某天突然報出OOM(記憶體溢位),觀察異常棧,是我們寫的業務程式碼造成的。
《深入理解Spark》之通過自定義分割槽器解決資料傾斜問題
package com.lyzx.day37 import org.apache.spark.{Partitioner, SparkConf, SparkContext} class D1 { //partitionBy和自定義分割槽器解決資料傾斜的問題 def
Spark資料傾斜的完美解決
資料傾斜解決方案資料傾斜的解決,跟之前講解的效能調優,有一點異曲同工之妙。效能調優中最有效最直接最簡單的方式就是加資源加並行度,並注意RDD架構(複用同一個RDD,加上cache快取)。相對於前面,shuffle、jvm等是次要的。6.1、原理以及現象分析6.1.1、資料傾斜
Spark效能優化之道——解決Spark資料傾斜(Data Skew)的N種姿勢
摘要 本文結合例項詳細闡明瞭Spark資料傾斜的幾種場景以及對應的解決方案,包括避免資料來源傾斜,調整並行度,使用自定義Partitioner,使用Map側Join代替Reduce側Join,給傾斜Key加上隨機字首等。 為何要處理資料傾斜(Da
spark1.x-spark-sql-資料傾斜解決方案
聚合源資料 過濾導致傾斜的key where條件 提高shuffle並行度 spark.sql.shuffle.partitions sqlContext.setConf("spark.sql.shuffle.partitions","1000")