1. 程式人生 > >Spark踩坑記:共享變數

Spark踩坑記:共享變數

收錄待用,修改轉載已取得騰訊雲授權

前言

前面總結的幾篇spark踩坑博文中,我總結了自己在使用spark過程當中踩過的一些坑和經驗。我們知道Spark是多機器叢集部署的,分為Driver/Master/Worker,Master負責資源排程,Worker是不同的運算節點,由Master統一排程。

而Driver是我們提交Spark程式的節點,並且所有的reduce型別的操作都會彙總到Driver節點進行整合。節點之間會將map/reduce等操作函式傳遞一個獨立副本到每一個節點,這些變數也會複製到每臺機器上,而節點之間的運算是相互獨立的,變數的更新並不會傳遞迴Driver程式。

那麼有個問題,如果我們想在節點之間共享一份變數,比如一份公共的配置項,該怎麼辦呢?Spark為我們提供了兩種特定的共享變數,來完成節點間變數的共享。 本文首先簡單的介紹spark以及spark streaming中累加器和廣播變數的使用方式,然後重點介紹一下如何更新廣播變數。

累加器

顧名思義,累加器是一種只能通過關聯操作進行“加”操作的變數,因此它能夠高效的應用於並行操作中。它們能夠用來實現counters和sums。Spark原生支援數值型別的累加器,開發者可以自己新增支援的型別,在2.0.0之前的版本中,通過繼承AccumulatorParam來實現,而2.0.0之後的版本需要繼承AccumulatorV2來實現自定義型別的累加器。

如果建立了一個具名的累加器,它可以在spark的UI中顯示。這對於理解執行階段(running stages)的過程有很重要的作用。如下圖:

在2.0.0之前版本中,累加器的宣告使用方式如下:

scala> val accum = sc.accumulator(0
, "My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10

累加器的宣告在2.0.0發生了變化,到2.1.0也有所變化,具體可以參考官方文件,我們這裡以2.1.0為例將程式碼貼一下:

scala> val accum = sc.longAccumulator
("My Accumulator") accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0) scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Long = 10

廣播變數

累加器比較簡單直觀,如果我們需要在spark中進行一些全域性統計就可以使用它。但是有時候僅僅一個累加器並不能滿足我們的需求,比如資料庫中一份公共配置表格,需要同步給各個節點進行查詢。OK先來簡單介紹下spark中的廣播變數:

廣播變數允許程式設計師快取一個只讀的變數在每臺機器上面,而不是每個任務儲存一份拷貝。例如,利用廣播變數,我們能夠以一種更有效率的方式將一個大資料量輸入集合的副本分配給每個節點。Spark也嘗試著利用有效的廣播演算法去分配廣播變數,以減少通訊的成本。

一個廣播變數可以通過呼叫SparkContext.broadcast(v)方法從一個初始變數v中建立。廣播變數是v的一個包裝變數,它的值可以通過value方法訪問,下面的程式碼說明了這個過程:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

從上文我們可以看出廣播變數的宣告很簡單,呼叫broadcast就能搞定,並且scala中一切可序列化的物件都是可以進行廣播的,這就給了我們很大的想象空間,可以利用廣播變數將一些經常訪問的大變數進行廣播,而不是每個任務儲存一份,這樣可以減少資源上的浪費。

更新廣播變數(rebroadcast)

廣播變數可以用來更新一些大的配置變數,比如資料庫中的一張表格,那麼有這樣一個問題,如果資料庫當中的配置表格進行了更新,我們需要重新廣播變數該怎麼做呢。上文對廣播變數的說明中,我們知道廣播變數是隻讀的,也就是說廣播出去的變數沒法再修改,那麼我們應該怎麼解決這個問題呢?
答案是利用spark中的unpersist函式

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

上文是從spark官方文件摘抄出來的,我們可以看出,正常來說每個節點的資料是不需要我們操心的,spark會自動按照LRU規則將老資料刪除,如果需要手動刪除可以呼叫unpersist函式。

那麼更新廣播變數的基本思路:將老的廣播變數刪除(unpersist),然後重新廣播一遍新的廣播變數,為此簡單包裝了一個用於廣播和更新廣播變數的wraper類,如下:

import java.io.{ ObjectInputStream, ObjectOutputStream }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag

// This wrapper lets us update brodcast variables within DStreams' foreachRDD
// without running into serialization issues
case class BroadcastWrapper[T: ClassTag](
    @transient private val ssc: StreamingContext,
    @transient private val _v: T) {

  @transient private var v = ssc.sparkContext.broadcast(_v)

  def update(newValue: T, blocking: Boolean = false): Unit = {
    // 刪除RDD是否需要鎖定
    v.unpersist(blocking)
    v = ssc.sparkContext.broadcast(newValue)
  }

  def value: T = v.value

  private def writeObject(out: ObjectOutputStream): Unit = {
    out.writeObject(v)
  }

  private def readObject(in: ObjectInputStream): Unit = {
    v = in.readObject().asInstanceOf[Broadcast[T]]
  }
}

利用該wrapper更新廣播變數,大致的處理邏輯如下:

// 定義
val yourBroadcast = BroadcastWrapper[yourType](ssc, yourValue)

yourStream.transform(rdd => {
  //定期更新廣播變數
  if (System.currentTimeMillis - someTime > Conf.updateFreq) {
    yourBroadcast.update(newValue, true)
  }
  // do something else
})

總結

spark中的共享變數是我們能夠在全域性做出一些操作,比如record總數的統計更新,一些大變數配置項的廣播等等。而對於廣播變數,我們也可以監控資料庫中的變化,做到定時的重新廣播新的資料表配置情況,另外我使用上述方式,在每天千萬級的資料實時流統計中表現穩定,所以有相似問題的同學也可以進行嘗試,有任何問題,歡迎隨時騷擾溝通^v^

廣告下我們專案:專注於遊戲輿情的挖掘分析,歡迎大家來踩踩

參考文獻