1. 程式人生 > >SparkStreaming(8):windows視窗操作

SparkStreaming(8):windows視窗操作

1.概念

   在一定的時間間隔(interval)進行一個時間段(window length)內的資料處理。

【參考:http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html

2.核心

(1)window length  : 視窗的長度(下圖是3)

(2)sliding interval: 視窗的間隔(下圖是2)

(3)這2個引數和Streaming的batch size都是倍數關係,否則會報錯!

3.例項(官方)

  每10s計算前30s的資料

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

     【注意:】

      Seconds(30), //視窗大小,指定計算最近多久的資料量,要求是父DStream的批次產生時間的整數倍
      Seconds(10) //滑動大小/新的DStream批次產生間隔時間,就是幾秒鐘來一次資料,要求是父DStream的批次產生時間的整數倍

4.例項程式碼

(1)原始碼

package _0809kafka

import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by Administrator on 2018/10/20.
  */
object WindowsReduceStream_simple_1020 {
  def main(args: Array[String]): Unit = {
    val sparkconf=new SparkConf().setMaster("local[2]").setAppName("WindowsReduceStream_simple_1020")
    val sc=new  SparkContext(sparkconf)
    val ssc = new StreamingContext(sc, Seconds(2))
    val checkpointPathDir = s"file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\streaming_08"
    ssc.checkpoint(checkpointPathDir)
    val dstream = ssc.socketTextStream("bigdata.ibeifeng.com", 9999)
    val batchResultDStream = dstream.flatMap(_.split(" ")).map(word => {
      (word,1)
    }).reduceByKey(_ + _)
    val resultDStream: DStream[(String, Int)] = batchResultDStream.reduceByKeyAndWindow(
      (a:Int,b:Int) => a+b,
      Seconds(6), //視窗大小,指定計算最近多久的資料量,要求是父DStream的批次產生時間的整數倍
      Seconds(2) //滑動大小/新的DStream批次產生間隔時間,就是幾秒鐘來一次資料,要求是父DStream的批次產生時間的整數倍
    )
    resultDStream.print()
    ssc.start()             // 啟動
    ssc.awaitTermination()

  }
}

(2)測試

  -》開啟9999埠

nc -lt 9999

  -》開啟程式
  -》結果:

	
	-------------------------------------------
	Time: 1540020870000 ms
	-------------------------------------------
	(hadoophadoop,15)
	(hadoop,60)
	(ccs,45)

(測試成功!)