1. 程式人生 > >Spark之廣播變數Broadcast Variables與計數器Accumulators

Spark之廣播變數Broadcast Variables與計數器Accumulators

一、廣播變數Broadcast Variables
  根據官方文件,廣播變數Broadcast Variables可以使開發者在每個節點–即Executor上快取一個只讀的變數,它相對於在每個task上覆制一份這個變數具有更好的優勢。因為它能減少網路和記憶體的開銷。例如,有一個Map資料,大小為10M。這份資料在spark執行過程中需要被用到。下面是虛擬碼

val mapVar = new HashMap()
val rdd = sc.textFile("....")
rdd.map(x=>{
	...mapVar...
})		

  如果這個spark程式有50個Executor,1000個task。那麼driver會將這個10M的Map分發給1000個task,共計10G。對於網路和記憶體來說,也是一個比較大的消耗。但是假如我們將它作為一個廣播變數,那麼driver只需要分發給50個Executor即可,即500M。相比之下用廣播變數的方式能減少資源的浪費。注意:廣播變數不能過大。

map ==> n task  <== executor
		||
		mapVar

1000task  10m  10G	
50executor 10m 500m

廣播變數用例(join操作)

val g5 = Array(("1","doudou"),("2","qingfeng"),("3","yanjing"),("4","xiazhi"))
//最佳生產實踐,這裡使用collectAsMap(),後面可以很方便的取值
val g5Stu = sc.parallelize(g5).collectAsMap() 
val g5StuBroadcast = sc.broadcast(g5Stu)

val f11 = sc.parallelize(Array(("4","xiazhi"),("6","su")))

f11.mapPartitions(
  partition => {
    val g5Value = g5StuBroadcast.value //獲取廣播變數裡面的內容
     for((key,value) <- partition if g5Value.contains(key))
     //針對每一次 for 迴圈的迭代, yield 會產生一個值,被迴圈記錄下來 (內部實現上,像是一個緩衝區)
     //當迴圈結束後, 會返回所有 yield 的值組成的集合
     //返回集合的型別與被遍歷的集合型別是一致的
      yield (key, g5Value.get(key).getOrElse(""))
  }).collect().foreach(println)

二、計數器Accumulators
  累加器是僅僅被相關操作累加的變數,因此可以在並行中被有效地支援。它可以被用來實現計數器和總和。Spark原生地只支援數字型別的累加器,程式設計者可以新增新型別的支援。如果建立累加器時指定了名字,可以在Spark的UI介面看到。這有利於理解每個執行階段的程序。(對於python還不支援) 。
  累加器通過對一個初始化了的變數v呼叫SparkContext.accumulator(v)來建立。在叢集上執行的任務可以通過add或者”+=”方法在累加器上進行累加操作。但是,它們不能讀取它的值。只有驅動程式能夠讀取它的值,通過累加器的value方法。

用例:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

檢視Spark UI介面
在這裡插入圖片描述
可以看到上面設定的計數器的名稱,總值,以及每個task上的值。