Spark的廣播和累加器的使用
阿新 • • 發佈:2019-01-27
一、廣播變數和累加器
1.1 廣播變數:
廣播變數允許程式設計師將一個只讀的變數快取在每臺機器上,而不用在任務之間傳遞變數。廣播變數可被用於有效地給每個節點一個大輸入資料集的副本。Spark還嘗試使用高效地廣播演算法來分發變數,進而減少通訊的開銷。
Spark的動作通過一系列的步驟執行,這些步驟由分散式的shuffle操作分開。Spark自動地廣播每個步驟每個任務需要的通用資料。這些廣播資料被序列化地快取,在執行任務之前被反序列化出來。這意味著當我們需要在多個階段的任務之間使用相同的資料,或者以反序列化形式快取資料是十分重要的時候,顯式地建立廣播變數才有用。
1.2 累加器:
累加器是僅僅被相關操作累加的變數,因此可以在並行中被有效地支援。它可以被用來實現計數器和總和。Spark原生地只支援數字型別的累加器,程式設計者可以新增新型別的支援。如果建立累加器時指定了名字,可以在Spark的UI介面看到。這有利於理解每個執行階段的程序。(對於python還不支援)
累加器通過對一個初始化了的變數v呼叫SparkContext.accumulator(v)來建立。在叢集上執行的任務可以通過add或者”+=”方法在累加器上進行累加操作。但是,它們不能讀取它的值。只有驅動程式能夠讀取它的值,通過累加器的value方法。
二.Java和Scala版本的實戰演示
2.1 Java版本:
/**
* 例項:利用廣播進行黑名單過濾!
* 檢查新的資料 根據是否在廣播變數-黑名單內,從而實現過濾資料。
*/
public class BroadcastAccumulator {
/**
* 建立一個List的廣播變數
*
*/
private static volatile Broadcast<List<String>> broadcastList = null;
/**
* 計數器!
*/
private static volatile Accumulator<Integer> accumulator = null;
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").
setAppName("WordCountOnlineBroadcast");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5 ));
/**
* 注意:分發廣播需要一個action操作觸發。
* 注意:廣播的是Arrays的asList 而非物件的引用。廣播Array陣列的物件引用會出錯。
* 使用broadcast廣播黑名單到每個Executor中!
*/
broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));
/**
* 累加器作為全域性計數器!用於統計線上過濾了多少個黑名單!
* 在這裡例項化。
*/
accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter");
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999);
/**
* 這裡省去flatmap因為名單是一個個的!
*/
JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) {
return v1 + v2;
}
});
/**
* Funtion裡面 前幾個引數是 入參。
* 後面的出參。
* 體現在call方法裡面!
*
*/
wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {
rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {
if (broadcastList.value().contains(wordPair._1)) {
/**
* accumulator不僅僅用來計數。
* 可以同時寫進資料庫或者快取中。
*/
accumulator.add(wordPair._2);
return false;
}else {
return true;
}
};
/**
* 廣播和計數器的執行,需要進行一個action操作!
*/
}).collect();
System.out.println("廣播器裡面的值"+broadcastList.value());
System.out.println("計時器裡面的值"+accumulator.value());
return null;
}
});
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}
2.2 Scala版本
package com.Streaming
import java.util
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
import org.apache.spark.broadcast.Broadcast
/**
* Created by lxh on 2016/6/30.
*/
object BroadcastAccumulatorStreaming {
/**
* 宣告一個廣播和累加器!
*/
private var broadcastList:Broadcast[List[String]] = _
private var accumulator:Accumulator[Int] = _
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest")
val sc = new SparkContext(sparkConf)
/**
* duration是ms
*/
val ssc = new StreamingContext(sc,Duration(2000))
// broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")
/**
* 獲取資料!
*/
val lines = ssc.socketTextStream("localhost",9999)
/**
* 1.flatmap把行分割成詞。
* 2.map把詞變成tuple(word,1)
* 3.reducebykey累加value
* (4.sortBykey排名)
* 4.進行過濾。 value是否在累加器中。
* 5.列印顯示。
*/
val words = lines.flatMap(line => line.split(" "))
val wordpair = words.map(word => (word,1))
wordpair.filter(record => {broadcastList.value.contains(record._1)})
val pair = wordpair.reduceByKey(_+_)
/**
* 這個pair 是PairDStream<String, Integer>
* 檢視這個id是否在黑名單中,如果是的話,累加器就+1
*/
/* pair.foreachRDD(rdd => {
rdd.filter(record => {
if (broadcastList.value.contains(record._1)) {
accumulator.add(1)
return true
} else {
return false
}
})
})*/
val filtedpair = pair.filter(record => {
if (broadcastList.value.contains(record._1)) {
accumulator.add(record._2)
true
} else {
false
}
}).print
println("累加器的值"+accumulator.value)
// pair.filter(record => {broadcastList.value.contains(record._1)})
/* val keypair = pair.map(pair => (pair._2,pair._1))*/
/**
* 如果DStream自己沒有某個運算元操作。就通過轉化transform!
*/
/* keypair.transform(rdd => {
rdd.sortByKey(false)//TODO
})*/
pair.print()
ssc.start()
ssc.awaitTermination()
}
}