Spark 2.x 自定義累加器AccumulatorV2的使用
阿新 • • 發佈:2019-02-02
廢除
Spark2.x之後,之前的的accumulator被廢除,用AccumulatorV2代替;
更新增加
建立並註冊一個long accumulator, 從“0”開始,用“add”累加
def longAccumulator(name: String): LongAccumulator = {
val acc = new LongAccumulator
register(acc, name)
acc
}
建立並註冊一個double accumulator, 從“0”開始,用“add”累加
def doubleAccumulator(name: String) : DoubleAccumulator = {
val acc = new DoubleAccumulator
register(acc, name)
acc
}
建立並註冊一個CollectionAccumulator, 從“empty list”開始,並加入集合
def collectionAccumulator[T](name: String): CollectionAccumulator[T] = {
val acc = new CollectionAccumulator[T]
register(acc, name)
acc
}
自定義累加器
1、類繼承extends AccumulatorV2[String, String],第一個為輸入型別,第二個為輸出型別
2、覆寫抽象方法:
isZero: 當AccumulatorV2中存在類似資料不存在這種問題時,是否結束程式。
copy: 拷貝一個新的AccumulatorV2
reset: 重置AccumulatorV2中的資料
add: 操作資料累加方法實現
merge: 合併資料
value: AccumulatorV2對外訪問的資料結果
參考,可參考longAccumulator原始碼:連結地址[291-361行]
下邊為一個簡單的案例:實現字串的拼接;
1、定義:MyAccumulator
class MyAccumulator extends AccumulatorV2[String,String]{
private var res = ""
override def isZero: Boolean = {res == ""}
override def merge(other: AccumulatorV2[String, String]): Unit = other match {
case o : MyAccumulator => res += o.res
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
override def copy(): MyAccumulator = {
val newMyAcc = new MyAccumulator
newMyAcc.res = this.res
newMyAcc
}
override def value: String = res
override def add(v: String): Unit = res += v +"-"
override def reset(): Unit = res = ""
}
2、呼叫:
object Accumulator1 {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Accumulator1").setMaster("local")
val sc = new SparkContext(conf)
val myAcc = new MyAccumulator
sc.register(myAcc,"myAcc")
//val acc = sc.longAccumulator("avg")
val nums = Array("1","2","3","4","5","6","7","8")
val numsRdd = sc.parallelize(nums)
numsRdd.foreach(num => myAcc.add(num))
println(myAcc)
sc.stop()
}
}
3、結果:1-2-3-4-5-6-7-8-
使用注意點
像map()這樣的惰性轉換中,不保證會執行累加器更新。
// Here, accum is still 0 because no actions have caused the map operation to be computed.
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }