SparkStreaming(8):windows視窗操作
阿新 • • 發佈:2018-11-08
1.概念
在一定的時間間隔(interval)進行一個時間段(window length)內的資料處理。
【參考:http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html】
2.核心
(1)window length : 視窗的長度(下圖是3)
(2)sliding interval: 視窗的間隔(下圖是2)
(3)這2個引數和Streaming的batch size都是倍數關係,否則會報錯!
3.例項(官方)
每10s計算前30s的資料
// Reduce last 30 seconds of data, every 10 seconds val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
【注意:】
Seconds(30), //視窗大小,指定計算最近多久的資料量,要求是父DStream的批次產生時間的整數倍
Seconds(10) //滑動大小/新的DStream批次產生間隔時間,就是幾秒鐘來一次資料,要求是父DStream的批次產生時間的整數倍
4.例項程式碼
(1)原始碼
package _0809kafka import org.apache.spark.streaming.dstream.DStream import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by Administrator on 2018/10/20. */ object WindowsReduceStream_simple_1020 { def main(args: Array[String]): Unit = { val sparkconf=new SparkConf().setMaster("local[2]").setAppName("WindowsReduceStream_simple_1020") val sc=new SparkContext(sparkconf) val ssc = new StreamingContext(sc, Seconds(2)) val checkpointPathDir = s"file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\streaming_08" ssc.checkpoint(checkpointPathDir) val dstream = ssc.socketTextStream("bigdata.ibeifeng.com", 9999) val batchResultDStream = dstream.flatMap(_.split(" ")).map(word => { (word,1) }).reduceByKey(_ + _) val resultDStream: DStream[(String, Int)] = batchResultDStream.reduceByKeyAndWindow( (a:Int,b:Int) => a+b, Seconds(6), //視窗大小,指定計算最近多久的資料量,要求是父DStream的批次產生時間的整數倍 Seconds(2) //滑動大小/新的DStream批次產生間隔時間,就是幾秒鐘來一次資料,要求是父DStream的批次產生時間的整數倍 ) resultDStream.print() ssc.start() // 啟動 ssc.awaitTermination() } }
(2)測試
-》開啟9999埠
nc -lt 9999
-》開啟程式
-》結果:
-------------------------------------------
Time: 1540020870000 ms
-------------------------------------------
(hadoophadoop,15)
(hadoop,60)
(ccs,45)