kafka的receive方式實現WordCount,使用updateStateByKey函式,累加所有批次的wordCount
阿新 • • 發佈:2018-11-11
Spark Streaming的updateStateByKey可以把DStream中的資料按key做reduce操作,然後對各個批次的資料進行累加。注意
wordDstream.updateStateByKey[Int]每次傳遞給updateFunc函式兩個引數,其中,
1、第一個引數是某個key(即某個單詞)的當前批次的一系列值的列表(Seq[Int]形式),updateFunc函式中 val currentCount = values.foldLeft(0)(_ + _)的作用,就是計算這個被傳遞進來的與某個key對應的當前批次的所有值的總和,也就是當前批次某個單詞的出現次數,儲存在變數currentCount中。
2、傳遞給updateFunc函式的第二個引數是某個key的歷史狀態資訊,也就是某個單詞歷史批次的詞頻彙總結果。實際上,某個單詞的歷史詞頻應該是一個Int型別,這裡為什麼要採用Option[Int]呢?
Option[Int]是型別 Int的容器,更確切地說,你可以把它看作是某種集合,這個特殊的集合要麼只包含一個元素(即單詞的歷史詞頻),要麼就什麼元素都沒有(這個單詞歷史上沒有出現過,所以沒有歷史詞頻資訊)。之所以採用 Option[Int]儲存歷史詞頻資訊,這是因為,歷史詞頻可能不存在,很多時候,在值不存在時,需要進行回退,或者提供一個預設值,Scala 為Option型別提供了getOrElse方法,以應對這種情況。 state.getOrElse(0)的含義是,如果該單詞沒有歷史詞頻統計彙總結果,那麼,就取值為0,如果有歷史詞頻統計結果,就取歷史結果,然後賦值給變數previousCount。最後,當前值和歷史值進行求和,幷包裝在Some中返回。
package day01
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, StreamingContext}
object kafkaConsumerDemo01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("wc").setMaster("local[2]")
//new StreamingContext, 設定每個批次的時間間隔是5秒
val ssc =new StreamingContext(conf,Duration(5000))
//zookeeper地址,通過zookeeper獲取元資料
val zkQurme = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
//使用checkPoint,儲存中間的結果
ssc.checkpoint ("D:\\Yue\\chk_0")
val groupid = "tt01"//組名
val topics = Map("test02"->2)//要消費的topic名,和執行緒數,可以有多個topic
//建立一個ReceiverInputDStream,相當於kafka的消費者
val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQurme,groupid,topics)
val fm: DStream[String] = kafkaDStream.flatMap(_._2.split(" "))
val maped: DStream[(String, Int)] = fm.map(x=>(x,1))
//定義一個函式seq代表當前批次的某個key的所對應的value的集合
//state是以前讀取的這個key的value的總和
val updateFunc =(seq:Seq[Int],state:Option[Int])=>{
val crunt: Int = seq.foldLeft(0)(_+_)
//使用foldLeft,進行累加,不能使用reduceLeft/reduce 會報 empty.reduceLeft
val sum = crunt + state.getOrElse(0)
Some(sum)
}
val update: DStream[(String, Int)] = maped.updateStateByKey(updateFunc)
update.print()
ssc.start()
ssc.awaitTermination()
}
}