1. 程式人生 > >spark資料傾斜分析與解決方案

spark資料傾斜分析與解決方案

Spark資料傾斜(資料分佈不均勻)

資料傾斜發生時的現象:

  1. 絕大多數task(任務)執行得都非常快,但個別task執行極慢。
  2. OOM(記憶體溢位),這種情況比較少見。

資料傾斜發生的原理

資料傾斜的原理很簡單:在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應的資料量特別大,就會發生資料傾斜。比如,大部分key對應的資料是10條,有一個key對應的資料是100萬條,那麼大部分的task只分配了10條資料,可能1秒就完成了,但是那個100萬條資料的task,可能還要經過一兩個小時,整個Spark作業的執行進度是由執行時間最長的那個task決定的。木桶原理。

因此出現數據傾斜的時候,Spark作業看起來會執行得非常緩慢,甚至可能因為某個task處理的資料量過大導致記憶體溢位。

資料傾斜產生在那些地方

首先要看的,就是資料傾斜發生在第幾個stage中。

Stage的劃分是觸發了shuffle操作,才會劃分stage。

觸發shuffle操作的運算元:

distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition

資料傾斜解決方法

1.     使用spark通用的優化方案

2.     分兩種情況,

一種聚合資料傾斜:

  1. 先對key前加n以內的隨機字首,然後計算,計算完成去掉隨機字首,再次合併結果。N一般來說取值在10左右

 

一種是join型別的資料傾斜:

a.     先對左表加隨機字首

b.     對右表擴容n倍

c.      執行join操作

d.     去掉結果中的字首

 

實現程式碼如下

一種聚合資料傾斜:

//key前加隨機數,聚合,再去掉隨機字首

  def testAcc(sc: SparkContext) = {

 

 

    sc.parallelize(List("hello", "hello", "hello", "hello", "world"))

      //sc.textFile("d:\\test\\ssc\\bias.txt",20)

      .map(word => (word, 1))

 

      //傳統做法,可能會出現資料傾斜

      .reduceByKey(_+_)

      //解決資料傾斜--加字尾+聚合+去後綴+聚合

      .map { case (key, value) => {

      val random = new Random();

      //將key加隨機字首

      (random.nextInt(3) + "_" + key, value)

    }

    }

      //聚合

      .reduceByKey(_ + _)

      //去隨機字首

      .map { case (k, v) => (k.substring(k.indexOf("_") + 1), v) }

      //聚合

      .reduceByKey(_ + _)

      .foreach(println)

    Thread.sleep(1000000)

  }

 

一種是join型別的資料傾斜:

二個rdd join操作 rddl.join(rdd2) 左表加字首--右表擴充套件n倍

def testJoin(sc: SparkContext): Unit = {

    val rddl=sc.parallelize(List((1,"hello"),(1,"hello"),(1,"hello"),(1,"hello"),(2,"world")))

    val rddr=sc.parallelize(List((1,"man"),(2,"woman")))

    //傳統方式,可能會出現資料傾斜

    //rddl.join(rdd2).foreach(println)

    //左側rdd加隨機字首(n以內),右側rdd根據隨機字首擴容n倍

    val prefixRdd=rddl.map{case (k,v)=>{

      val random = new Random()

      (random.nextInt(3) + "_" + k, v)

    }}

 

    //右側擴容

    val expandRdd=rddr.flatMap{

      case (k,v)=>{

        val num=List(0,1,2)

        num.map(i=>(i+"_"+k,v))

      }

    }

    //去掉字首

    prefixRdd.join(expandRdd)

      .map{case (k,v)=>(k.split("_")(1),v)}

      .foreach(println)

 

  }