1. 程式人生 > >kafka的receive方式實現WordCount,使用updateStateByKey函式,累加所有批次的wordCount

kafka的receive方式實現WordCount,使用updateStateByKey函式,累加所有批次的wordCount

Spark Streaming的updateStateByKey可以把DStream中的資料按key做reduce操作,然後對各個批次的資料進行累加。注意
wordDstream.updateStateByKey[Int]每次傳遞給updateFunc函式兩個引數,其中,
1、第一個引數是某個key(即某個單詞)的當前批次的一系列值的列表(Seq[Int]形式),updateFunc函式中 val currentCount = values.foldLeft(0)(_ + _)的作用,就是計算這個被傳遞進來的與某個key對應的當前批次的所有值的總和,也就是當前批次某個單詞的出現次數,儲存在變數currentCount中。
2、傳遞給updateFunc函式的第二個引數是某個key的歷史狀態資訊,也就是某個單詞歷史批次的詞頻彙總結果。實際上,某個單詞的歷史詞頻應該是一個Int型別,這裡為什麼要採用Option[Int]呢?
Option[Int]是型別 Int的容器,更確切地說,你可以把它看作是某種集合,這個特殊的集合要麼只包含一個元素(即單詞的歷史詞頻),要麼就什麼元素都沒有(這個單詞歷史上沒有出現過,所以沒有歷史詞頻資訊)。之所以採用 Option[Int]儲存歷史詞頻資訊,這是因為,歷史詞頻可能不存在,很多時候,在值不存在時,需要進行回退,或者提供一個預設值,Scala 為Option型別提供了getOrElse方法,以應對這種情況。 state.getOrElse(0)的含義是,如果該單詞沒有歷史詞頻統計彙總結果,那麼,就取值為0,如果有歷史詞頻統計結果,就取歷史結果,然後賦值給變數previousCount。最後,當前值和歷史值進行求和,幷包裝在Some中返回。

package day01

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, StreamingContext}

object kafkaConsumerDemo01 {
  def main(args: Array[String]):
Unit = { val conf = new SparkConf().setAppName("wc").setMaster("local[2]") //new StreamingContext, 設定每個批次的時間間隔是5秒 val ssc =new StreamingContext(conf,Duration(5000)) //zookeeper地址,通過zookeeper獲取元資料 val zkQurme = "hadoop01:2181,hadoop02:2181,hadoop03:2181" //使用checkPoint,儲存中間的結果 ssc.checkpoint
("D:\\Yue\\chk_0") val groupid = "tt01"//組名 val topics = Map("test02"->2)//要消費的topic名,和執行緒數,可以有多個topic //建立一個ReceiverInputDStream,相當於kafka的消費者 val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQurme,groupid,topics) val fm: DStream[String] = kafkaDStream.flatMap(_._2.split(" ")) val maped: DStream[(String, Int)] = fm.map(x=>(x,1)) //定義一個函式seq代表當前批次的某個key的所對應的value的集合 //state是以前讀取的這個key的value的總和 val updateFunc =(seq:Seq[Int],state:Option[Int])=>{ val crunt: Int = seq.foldLeft(0)(_+_) //使用foldLeft,進行累加,不能使用reduceLeft/reduce 會報 empty.reduceLeft val sum = crunt + state.getOrElse(0) Some(sum) } val update: DStream[(String, Int)] = maped.updateStateByKey(updateFunc) update.print() ssc.start() ssc.awaitTermination() } }