spark資料傾斜分析與解決方案
Spark資料傾斜(資料分佈不均勻)
資料傾斜發生時的現象:
- 絕大多數task(任務)執行得都非常快,但個別task執行極慢。
- OOM(記憶體溢位),這種情況比較少見。
資料傾斜發生的原理
資料傾斜的原理很簡單:在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應的資料量特別大,就會發生資料傾斜。比如,大部分key對應的資料是10條,有一個key對應的資料是100萬條,那麼大部分的task只分配了10條資料,可能1秒就完成了,但是那個100萬條資料的task,可能還要經過一兩個小時,整個Spark作業的執行進度是由執行時間最長的那個task決定的。木桶原理。
因此出現數據傾斜的時候,Spark作業看起來會執行得非常緩慢,甚至可能因為某個task處理的資料量過大導致記憶體溢位。
資料傾斜產生在那些地方
首先要看的,就是資料傾斜發生在第幾個stage中。
Stage的劃分是觸發了shuffle操作,才會劃分stage。
觸發shuffle操作的運算元:
distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition
資料傾斜解決方法
1. 使用spark通用的優化方案
2. 分兩種情況,
一種聚合資料傾斜:
- 先對key前加n以內的隨機字首,然後計算,計算完成去掉隨機字首,再次合併結果。N一般來說取值在10左右
一種是join型別的資料傾斜:
a. 先對左表加隨機字首
b. 對右表擴容n倍
c. 執行join操作
d. 去掉結果中的字首
實現程式碼如下
一種聚合資料傾斜:
//key前加隨機數,聚合,再去掉隨機字首
def testAcc(sc: SparkContext) = {
sc.parallelize(List("hello", "hello", "hello", "hello", "world"))
//sc.textFile("d:\\test\\ssc\\bias.txt",20)
.map(word => (word, 1))
//傳統做法,可能會出現資料傾斜
.reduceByKey(_+_)
//解決資料傾斜--加字尾+聚合+去後綴+聚合
.map { case (key, value) => {
val random = new Random();
//將key加隨機字首
(random.nextInt(3) + "_" + key, value)
}
}
//聚合
.reduceByKey(_ + _)
//去隨機字首
.map { case (k, v) => (k.substring(k.indexOf("_") + 1), v) }
//聚合
.reduceByKey(_ + _)
.foreach(println)
Thread.sleep(1000000)
}
一種是join型別的資料傾斜:
二個rdd join操作 rddl.join(rdd2) 左表加字首--右表擴充套件n倍
def testJoin(sc: SparkContext): Unit = {
val rddl=sc.parallelize(List((1,"hello"),(1,"hello"),(1,"hello"),(1,"hello"),(2,"world")))
val rddr=sc.parallelize(List((1,"man"),(2,"woman")))
//傳統方式,可能會出現資料傾斜
//rddl.join(rdd2).foreach(println)
//左側rdd加隨機字首(n以內),右側rdd根據隨機字首擴容n倍
val prefixRdd=rddl.map{case (k,v)=>{
val random = new Random()
(random.nextInt(3) + "_" + k, v)
}}
//右側擴容
val expandRdd=rddr.flatMap{
case (k,v)=>{
val num=List(0,1,2)
num.map(i=>(i+"_"+k,v))
}
}
//去掉字首
prefixRdd.join(expandRdd)
.map{case (k,v)=>(k.split("_")(1),v)}
.foreach(println)
}