1. 程式人生 > >SparkStreaming中reduceByKeyAndWindow運算元的使用

SparkStreaming中reduceByKeyAndWindow運算元的使用

這裡寫圖片描述

截圖自官網,例如每個方塊代表5秒鐘,上面的虛線框住的是3個視窗就是15秒鐘,這裡的15秒鐘就是視窗的長度,其中虛線到實線移動了2個方塊表示10秒鐘,這裡的10秒鐘就表示每隔10秒計算一次視窗長度的資料

舉個例子: 如下圖

這裡寫圖片描述

我是這樣理解的:如果這裡是使用視窗函式計算wordcount 在第一個視窗(虛線視窗)計算出來(aa, 1)(bb,3)(cc,1)當到達時間10秒後窗口移動到實線視窗,就會計算這個實線視窗中的單詞,這裡就為(bb,1)(cc,2)(aa,1)

附上程式:

注意:視窗滑動長度和視窗長度一定要是SparkStreaming微批處理時間的整數倍,不然會報錯.

package cn.lijie.kafka

import
cn.lijie.MyLog import org.apache.log4j.Level import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} /** * User: lijie * Date: 2017/8/8 * Time: 14:04 */ object SparkWindowDemo { val myfunc = (it: Iterator[(String, Seq[Int], Option[Int])])
=>
{ it.map(x => { (x._1, x._2.sum + x._3.getOrElse(0)) }) } def main(args: Array[String]): Unit = { MyLog.setLogLeavel(Level.WARN) val conf = new SparkConf().setMaster("local[2]").setAppName("window") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(2
)) sc.setCheckpointDir("C:\\Users\\Administrator\\Desktop\\myck01") val ds = ssc.socketTextStream("192.168.80.123", 9999) //Seconds(5)表示視窗的寬度 Seconds(3)表示多久滑動一次(滑動的時間長度) val re = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(20), Seconds(10)) // 視窗長度和滑動的長度一致,那麼類似於每次計算自己批量的資料,用updateStateByKey也可以累計計算單詞的wordcount 這裡只是做個是實驗 // val re = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(4), Seconds(4)).updateStateByKey(myfunc, new HashPartitioner(sc.defaultParallelism), true) re.print() ssc.start() ssc.awaitTermination() } }