Flink DataSet API 之 Accumulators & Counters(累加器)
阿新 • • 發佈:2019-01-12
基本介紹
1、Accumulator即累加器,與Mapreduce counter的應用場景差不多,都能很好地觀察task在執行期間的資料變化。可以在Flink job任務中的運算元函式中操作累加器,但是隻能在任務執行結束之後才能獲得累加器的最終結果。
2、Counter是一個具體的累加器(Accumulator)實現。例如:IntCounter, LongCounter 和 DoubleCounter
用法
1:建立累加器: private IntCounter numLines = new IntCounter();
2:註冊累加器: getRuntimeContext().addAccumulator("num-lines", this.numLines);
3:使用累加器: this.numLines.add(1);
4:獲取累加器的結果: myJobExecutionResult.getAccumulatorResult("num-lines")
使用Demo
import org.apache.flink.api.common.accumulators.IntCounter import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.configuration.Configuration object BatchDemoCounter { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val data = env.fromElements("a","b","c","d") val res = data.map(new RichMapFunction[String, String] { val numLines = new IntCounter()//定義 override def open(parameters: Configuration): Unit = { super.open(parameters) getRuntimeContext.addAccumulator("num-lines", numLines)// 註冊 } override def map(in: String): String = { this.numLines.add(1)// 使用 in } }).setParallelism(4) res.writeAsText("d:/sshdata/count").setParallelism(1) val jobResult = env.execute("BatchDemoCounter") val num = jobResult.getAccumulatorResult[Int]("num-lines")// 獲取 println(num) } }