1. 程式人生 > 實用技巧 >Spark(八)【廣播變數和累加器】

Spark(八)【廣播變數和累加器】

在spark程式中,當一個傳遞給Spark操作(例如map和reduce)的函式在遠端節點上面執行時,Spark操作實際上操作的是這個函式所用變數的一個獨立副本。這些變數會被複制到每臺機器上,並且這些變數在遠端機器上的所有更新都不會傳遞迴驅動程式。通常跨任務的讀寫變數是低效的,但是,Spark還是為兩種常見的使用模式提供了兩種有限的共享變數:廣播變數(broadcast variable)和累加器(accumulator)

一. 廣播變數

分散式共享只讀變數

使用廣播變數前

使用廣播變數後

  • 廣播變數是將變數複製到每一臺機器上而不是普通變數那樣複製到每個task上,從圖二可以看出變數先是從遠端的driver上覆制到一臺機器上,然後這臺機器的task就從這臺機器上獲取變數,不需要再從遠端節點上獲取,減少了IO,加快了速率。廣播變數只能讀取,並不能修改。經典應用是大表與小表的join中通過廣播變數小表來實現以brodcast join 取代reduce join
  • 廣播是driver進行的操作,必須是有結果的變數,故不可能直接廣播RDD,常常的是將通過collectAsMap運算元將需要廣播的RDD轉換成Map集合然後廣播出去
  • 變數一旦被定義為一個廣播變數,那麼這個變數只能讀,不能修改

使用

sc.broadcast 方法包裝變數

bc.value 去除值

  /**
   * 廣播變數
   */
  def broadcast_test: Unit = {
    val list = List(1, 2, 3, 4)
    val rdd = sc.makeRDD(list, 2)
    val list1 = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
    //使用廣播變數將list2包裝起來
    val bc: Broadcast[List[Int]] = sc.broadcast(list)
    //使用.value方法去除廣播變數中的值
    rdd.filter(x => bc.value.contains(list))
  }

注意

① 能不能將一個RDD使用廣播變數廣播出去?不能 ,因為RDD是不存資料的。可以將RDD的結果廣播出去。

② 廣播變數不能被修改,必須是隻讀

二. 累加器

分散式只寫變數

累加器用來對資訊進行聚合,通常在向 Spark傳遞函式時,比如使用 map() 函式或者用 filter() 傳條件時,可以使用Driver中定義的變數,但是叢集中執行的每個task任務都會得到這些變數的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變數。如果我們想實現所有分片處理時更新共享變數的功能,那麼累加器可以實現我們想要的效果。

使用

1)基本型別

//1.建立一個累加器
acc : LongAccumulator = sc.longAccumulator("累加器名")
//2.在rdd中使用累加器進行累加
acc.add("值") 

2)自定義型別

    //TODO 建立一個累加器
    val accumulator: WordCountAccumulator = new WordCountAccumulator
    //TODO 註冊累加器
    sc.register(accumulator)
    //呼叫add方法累加資料
    rdd.foreach(x=>accumulator.add(x))

使用累加器前

    val rdd = sc.makeRDD(List(3, 4, 5, 12, 43, 2), 4)
    var sum = 0
    rdd.foreach(sum += _)
    print(sum)
    //sum輸出為0

使用累加器後

  /**
   * 累加器
   */
  @Test
  def accumulator {
    val rdd = sc.makeRDD(List(3, 4, 5, 12, 43, 2), 2)
    //建立一個累加器
    val acc: LongAccumulator = sc.longAccumulator("sum")
    //rdd計算中使用累加器進行累加
    rdd.foreach(acc.add(_))
    print(acc.value)
  }

使用場景

模擬 counter(計數器) 或 執行sum(累加求和), 避免了Shuffle。

Spark預設只提供數值型別(整數,浮點)的累加器

自定義累加器

需求:實現WordCount案例

①自定義一個累加器類

//0.繼承AccumulatorV2[IN,OUT]:宣告兩個泛型,IN:需要累加的資料型別。OUT:累加器返回的資料型別
class WordCountAccumulator() extends AccumulatorV2[String, mutable.Map[String, Int]] {
  //1.用Map接收資料, 返回結果
  private val result: mutable.Map[String, Int] = new mutable.HashMap[String, Int]()

  //2.判斷當前累加器是否為初始化狀態
  override def isZero: Boolean = result.isEmpty

  //3.複製累加器物件
  override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = new WordCountAccumulator

  //4.重置累加器
  override def reset(): Unit = result.clear()

  //5.往累加器中累加資料
  override def add(v: String): Unit = {
    //將單詞v新增進map集合,若不存在key為v的資料,value就設為0,然後加1;若已經存在key為v的資料,直接在value基礎上加1.
    result.put(v, result.get(v).getOrElse(0) + 1)
  }

  //6.當前累加器和其他累加器合併
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
    //變數other累加器結果,新增進當前累加器
    other.value.foreach {
      case (word, count) => result.put(word, result.get(word).getOrElse(0) + count)
    }
  }

  //7.獲取累加器結果
  override def value: mutable.Map[String, Int] = result
}

②使用累加器進行程式設計

  /**
   * 自定義累加器
   */
  @Test
  def accumulator {
    val rdd = sc.makeRDD(List("hello","spark","hello","hello","scala","spark"), 2)
    //TODO 建立一個累加器
    val accumulator: WordCountAccumulator = new WordCountAccumulator
    //TODO 註冊累加器
    sc.register(accumulator)
    //呼叫add方法累加資料
    rdd.foreach(x=>accumulator.add(x))
    print(accumulator.value)
  }

參考:
Spark累加器(Accumulator)陷阱及解決辦法

Spark的廣播變數和累加器