Spark-Streaming updateStateByKey用法(計算累加值)、並與kafka整合使用
阿新 • • 發佈:2019-02-05
說明
Spark Streaming的updateStateByKey可以DStream中的資料進行按key做reduce操作,然後對各個批次的資料進行累加。
計算word count所有批次的累加值。
import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.kafka.KafkaUtils object sparkUpdateState { def main(args: Array[String]): Unit = { //由於日誌資訊較多,只打印錯誤日誌資訊 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) val conf = new SparkConf().setAppName("dstream").setMaster("local[*]") val ssc = new StreamingContext(conf,Seconds(1)) //使用updateStateByKey前需要設定checkpoint,將資料進行持久化儲存,不然每次執行都是新的,不會與歷史資料進行關聯 // ssc.checkpoint("f:/spark_out") //將資料儲存在hdfs中 ssc.checkpoint("hdfs://192.168.200.10:9000/spark_out") //與kafka做整合,使用KafkaUtils類進行引數配置 val(zkQuorum,groupId,topics)=("192.168.200.10:2181","kafka_group",Map("sparkTokafka"->1)) val value: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQuorum,groupId,topics) //將updateFunc轉換為運算元 val updateFunc2 = updateFunc _ //統計輸入的字串,根據空格進行切割統計 value.flatMap(_._2.split(" ")).map((_,1)).updateStateByKey[Int](updateFunc2).print() ssc.start() ssc.awaitTermination() } def updateFunc(seq:Seq[Int], option:Option[Int])={ //sum統計一次批處理後,單詞統計 var sum=seq.sum; //i為已經累計的值,因為option可能為空,如果為空的話,返回的是None,所以如果為空則返回0 val i = option.getOrElse(0) // 返回累加後的結果,是一個Option[Int]型別 Some(sum+i) } }
執行效果: