1. 程式人生 > >Spark的廣播和累加器的使用

Spark的廣播和累加器的使用

一、廣播變數和累加器

1.1 廣播變數:

廣播變數允許程式設計師將一個只讀的變數快取在每臺機器上,而不用在任務之間傳遞變數。廣播變數可被用於有效地給每個節點一個大輸入資料集的副本。Spark還嘗試使用高效地廣播演算法來分發變數,進而減少通訊的開銷。
Spark的動作通過一系列的步驟執行,這些步驟由分散式的shuffle操作分開。Spark自動地廣播每個步驟每個任務需要的通用資料。這些廣播資料被序列化地快取,在執行任務之前被反序列化出來。這意味著當我們需要在多個階段的任務之間使用相同的資料,或者以反序列化形式快取資料是十分重要的時候,顯式地建立廣播變數才有用。

1.2 累加器:

累加器是僅僅被相關操作累加的變數,因此可以在並行中被有效地支援。它可以被用來實現計數器和總和。Spark原生地只支援數字型別的累加器,程式設計者可以新增新型別的支援。如果建立累加器時指定了名字,可以在Spark的UI介面看到。這有利於理解每個執行階段的程序。(對於python還不支援)
累加器通過對一個初始化了的變數v呼叫SparkContext.accumulator(v)來建立。在叢集上執行的任務可以通過add或者”+=”方法在累加器上進行累加操作。但是,它們不能讀取它的值。只有驅動程式能夠讀取它的值,通過累加器的value方法。

二.Java和Scala版本的實戰演示

2.1 Java版本:


/**
 * 例項:利用廣播進行黑名單過濾!
 * 檢查新的資料 根據是否在廣播變數-黑名單內,從而實現過濾資料。
 */
public class BroadcastAccumulator {

    /**
     * 建立一個List的廣播變數
     *
     */
    private static volatile Broadcast<List<String>> broadcastList = null;

    /**
     * 計數器!
     */
    private
static volatile Accumulator<Integer> accumulator = null; public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]"). setAppName("WordCountOnlineBroadcast"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5
)); /** * 注意:分發廣播需要一個action操作觸發。 * 注意:廣播的是Arrays的asList 而非物件的引用。廣播Array陣列的物件引用會出錯。 * 使用broadcast廣播黑名單到每個Executor中! */ broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive")); /** * 累加器作為全域性計數器!用於統計線上過濾了多少個黑名單! * 在這裡例項化。 */ accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter"); JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999); /** * 這裡省去flatmap因為名單是一個個的! */ JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) { return v1 + v2; } }); /** * Funtion裡面 前幾個引數是 入參。 * 後面的出參。 * 體現在call方法裡面! * */ wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() { @Override public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception { rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() { @Override public Boolean call(Tuple2<String, Integer> wordPair) throws Exception { if (broadcastList.value().contains(wordPair._1)) { /** * accumulator不僅僅用來計數。 * 可以同時寫進資料庫或者快取中。 */ accumulator.add(wordPair._2); return false; }else { return true; } }; /** * 廣播和計數器的執行,需要進行一個action操作! */ }).collect(); System.out.println("廣播器裡面的值"+broadcastList.value()); System.out.println("計時器裡面的值"+accumulator.value()); return null; } }); jsc.start(); jsc.awaitTermination(); jsc.close(); } }

2.2 Scala版本

package com.Streaming

import java.util

import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
import org.apache.spark.broadcast.Broadcast

/**
  * Created by lxh on 2016/6/30.
  */
object BroadcastAccumulatorStreaming {

  /**
    * 宣告一個廣播和累加器!
    */
  private var broadcastList:Broadcast[List[String]]  = _
  private var accumulator:Accumulator[Int] = _

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest")
    val sc = new SparkContext(sparkConf)

    /**
      * duration是ms
      */
    val ssc = new StreamingContext(sc,Duration(2000))
   // broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
    broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
    accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")

    /**
      * 獲取資料!
      */
    val lines = ssc.socketTextStream("localhost",9999)

    /**
      * 1.flatmap把行分割成詞。
      * 2.map把詞變成tuple(word,1)
      * 3.reducebykey累加value
      * (4.sortBykey排名)
      * 4.進行過濾。 value是否在累加器中。
      * 5.列印顯示。
      */
    val words = lines.flatMap(line => line.split(" "))

    val wordpair = words.map(word => (word,1))

    wordpair.filter(record => {broadcastList.value.contains(record._1)})


    val pair = wordpair.reduceByKey(_+_)

    /**
      * 這個pair 是PairDStream<String, Integer>
      * 檢視這個id是否在黑名單中,如果是的話,累加器就+1
      */
/*    pair.foreachRDD(rdd => {
      rdd.filter(record => {

        if (broadcastList.value.contains(record._1)) {
          accumulator.add(1)
          return true
        } else {
          return false
        }

      })

    })*/

    val filtedpair = pair.filter(record => {
        if (broadcastList.value.contains(record._1)) {
          accumulator.add(record._2)
          true
        } else {
          false
        }

     }).print

    println("累加器的值"+accumulator.value)

   // pair.filter(record => {broadcastList.value.contains(record._1)})

   /* val keypair = pair.map(pair => (pair._2,pair._1))*/

    /**
      * 如果DStream自己沒有某個運算元操作。就通過轉化transform!
      */
   /* keypair.transform(rdd => {
      rdd.sortByKey(false)//TODO
    })*/
    pair.print()
    ssc.start()
    ssc.awaitTermination()

  }

}

相關: