1. 程式人生 > >Flink與SparkStreaming之Counters& Accumulators累加器雙向應用案例實戰-Flink牛刀小試

Flink與SparkStreaming之Counters& Accumulators累加器雙向應用案例實戰-Flink牛刀小試

版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何問題,可隨時聯絡。

1 累加器應用場景

Accumulator即累加器,與Saprk Accumulator 的應用場景差不多,都能很好地觀察task在執行期間的資料變化 可以在Flink job任務中的運算元函式中操作累加器,但是隻能在任務執行結束之後才能獲得累加器的最終結果。 Counter是一個具體的累加器(Accumulator)實現

2 Flink與SparkStreaming雙向應用對比

2.1 Flink累加器使用

Counter是一個具體的累加器(Accumulator)實現,如IntCounter, LongCounter 和 DoubleCounter。

用法

  • 1:建立累加器

    private IntCounter numLines = new IntCounter();

  • 2:註冊累加器

    getRuntimeContext().addAccumulator("num-lines", this.numLines);

  • 3:使用累加器

    this.numLines.add(1);

  • 4:獲取累加器的結果

    myJobExecutionResult.getAccumulatorResult("num-lines")

2.2 SparkStreaming累加器使用

  • 1:設定自定義累加器,實現所有資料的統計功能,注意累加器也是懶執行的

    val sessionAggrStatAccumulator = new SessionAggrStatAccumulator

  • 2:自定義累加器實現

      class SessionAggrStatAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]] {
    
          // 儲存所有聚合資料
          private val aggrStatMap = mutable.HashMap[String, Int]()
        
          override def isZero: Boolean = {
            aggrStatMap.isEmpty
          }
        
          override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {
            val newAcc = new SessionAggrStatAccumulator
            aggrStatMap.synchronized{
              newAcc.aggrStatMap ++= this.aggrStatMap
            }
            newAcc
          }
        
          override def reset(): Unit = {
            aggrStatMap.clear()
          }
        
          override def add(v: String): Unit = {
            if (!aggrStatMap.contains(v))
              aggrStatMap += (v -> 0)
            aggrStatMap.update(v, aggrStatMap(v) + 1)
          }
        
          override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {
            other match {
              case acc:SessionAggrStatAccumulator => {
                (this.aggrStatMap /: acc.value){ case (map, (k,v)) => map += ( k -> (v + map.getOrElse(k, 0)) )}
              }
            }
          }
        
          override def value: mutable.HashMap[String, Int] = {
            this.aggrStatMap
          }
    }
    複製程式碼
  • 3:註冊自定義累加器

     sc.register(sessionAggrStatAccumulator, "sessionAggrStatAccumulator")
    複製程式碼
  • 4:累加器值得獲取與使用

      calculateAndPersistAggrStat(spark, sessionAggrStatAccumulator.value, taskUUID)
      
      sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1s_3s);
    複製程式碼

3 Flink 累加器使用案例實戰

3.1 注意:只有在任務執行結束後,才能獲取到累加器的值

3.2 注意:RichMapFunction的使用發生在高階特性上。

public class BatchDemoCounter {

    public static void main(String[] args) throws Exception{

        //獲取執行環境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> data = env.fromElements("a", "b", "c", "d");

        DataSet<String> result = data.map(new RichMapFunction<String, String>() {

            //1:建立累加器
           private IntCounter numLines = new IntCounter();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //2:註冊累加器
                getRuntimeContext().addAccumulator("num-lines",this.numLines);

            }

            //int sum = 0;
            @Override
            public String map(String value) throws Exception {
                //如果並行度為1,使用普通的累加求和即可,但是設定多個並行度,則普通的累加求和結果就不準了
                //sum++;
                //System.out.println("sum:"+sum);
                this.numLines.add(1);
                return value;
            }
        }).setParallelism(8);

        //result.print();

        result.writeAsText("d:\\BatchDemoCounter");

        JobExecutionResult jobResult = env.execute("counter");
        //3:獲取累加器
        int num = jobResult.getAccumulatorResult("num-lines");
        System.out.println("num:"+num);

    }
}
複製程式碼

3.3 結果展示(writeAsText觸發後才能讀取到counter)

也即提前執行了一波action操作,然後才能取出其值。

    num:4
複製程式碼

總結

微觀看世界,本文雖小,五臟俱全。為了成一套體系,不得已而為之,請期待後面有關Spark和kafka的原始碼解讀系列。辛苦成文,彼此珍惜,謝謝。

秦凱新 於深圳 201811251651