SparkStreaming中reduceByKeyAndWindow運算元的使用
阿新 • • 發佈:2018-12-15
截圖自官網,例如每個方塊代表5秒鐘,上面的虛線框住的是3個視窗就是15秒鐘,這裡的15秒鐘就是視窗的長度,其中虛線到實線移動了2個方塊表示10秒鐘,這裡的10秒鐘就表示每隔10秒計算一次視窗長度的資料
舉個例子: 如下圖
我是這樣理解的:如果這裡是使用視窗函式計算wordcount 在第一個視窗(虛線視窗)計算出來(aa, 1)(bb,3)(cc,1)當到達時間10秒後窗口移動到實線視窗,就會計算這個實線視窗中的單詞,這裡就為(bb,1)(cc,2)(aa,1)
附上程式:
注意:視窗滑動長度和視窗長度一定要是SparkStreaming微批處理時間的整數倍,不然會報錯.
package cn.lijie.kafka
import cn.lijie.MyLog
import org.apache.log4j.Level
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
/**
* User: lijie
* Date: 2017/8/8
* Time: 14:04
*/
object SparkWindowDemo {
val myfunc = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
it.map(x => {
(x._1, x._2.sum + x._3.getOrElse(0))
})
}
def main(args: Array[String]): Unit = {
MyLog.setLogLeavel(Level.WARN)
val conf = new SparkConf().setMaster("local[2]").setAppName("window")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2 ))
sc.setCheckpointDir("C:\\Users\\Administrator\\Desktop\\myck01")
val ds = ssc.socketTextStream("192.168.80.123", 9999)
//Seconds(5)表示視窗的寬度 Seconds(3)表示多久滑動一次(滑動的時間長度)
val re = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(20), Seconds(10))
// 視窗長度和滑動的長度一致,那麼類似於每次計算自己批量的資料,用updateStateByKey也可以累計計算單詞的wordcount 這裡只是做個是實驗
// val re = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(4), Seconds(4)).updateStateByKey(myfunc, new HashPartitioner(sc.defaultParallelism), true)
re.print()
ssc.start()
ssc.awaitTermination()
}
}