1. 程式人生 > WINDOWS開發 >Flink 視窗 window

Flink 視窗 window

一、基本概念

1.視窗分類

  TimeWindow:按照時間生成 Window。對於 TimeWindow,可以根據視窗實現原理的不同分成三類:滾動視窗(TumblingWindow)、滑動視窗(Sliding Window)和會話視窗(Session Window)。

  CountWindow:按照指定的資料條數生成一個 Window,與時間無關。

2.時間分類

  Event Time:是事件建立的時間。它通常由事件中的時間戳描述,例如採集的日誌資料中,每一條日誌都會記錄自己的生成時間,Flink 通過時間戳分配器訪問事件時間戳。

  Ingestion Time:是資料進入 Flink 的時間。

  Processing Time:是每一個執行基於時間操作的運算元的本地系統時間,與機器相關,預設的時間屬性就是 Processing Time。 3.水位Watermark   由於事件產生的時間,和到達Flink的時間並不是完全有序的,可能先發生的時間卻後達到Flink。因此需要需要設定一個延遲時間t,視窗不是到達長度之後就觸發計算,而是到達長度+延遲t之後才觸發計算。而水位watermark就是視窗當前資料的時間戳減去延遲時間,表示小於watermark的資料都已經到達了(不含watermark)。

二、案例演示

案例1:按Processing Time劃分滾動時間視窗

  TimeWindow 是將指定時間範圍內的所有資料組成一個 window,一次對一個window 裡面的所有資料進行計算。Flink 預設的時間視窗根據 Processing Time 進行視窗的劃分,將 Flink 獲取到的資料根據進入 Flink 的時間劃分到不同的視窗中。時間間隔可以通過 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。   在下面的程式碼中,因為沒有指定時間型別,所以採用了預設的Processing Time,即Flink實際計算資料的時間,通過.timeWindow(Time.seconds(10))設定視窗的大小為10秒,當一條資料進來後開始計時,10秒之後輸出這個視窗中所有資料的計算結果。
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowTest {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val socketStream = env.socketTextStream("hadoop102",7777)

    val dataStream: DataStream[SensorReading] = socketStream.map(d => {
      val arr = d.split(",")
      SensorReading(arr(0).trim,arr(1).trim.toLong,arr(2).toDouble)
    })

    //統計10秒內的最小溫度
    val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature))
      .keyBy(_._1)
      .timeWindow(Time.seconds(10)) //10秒滾動視窗,不指定時間特性,預設為ProcessingTime
      .reduce((data1,data2)=>(data1._1,data1._2.min(data2._2)))

    //列印原始的dataStream
    dataStream.print("data stream")

    //列印視窗資料流
    minTemperatureStream.print("min temperature")

    env.execute("window test")

  }

}

  測試:  

  連續輸入兩條資料

[[email protected] ~]$ nc -lk 7777
sensor_1,1547718200,30.8
sensor_1,1547718201,40.8

  在一個10秒的滾動視窗內,視窗流minTemperatureStream只輸出了一條資料。此時觸發TimeWindow去計算的時機就是第一條資料來的10秒過後。

data stream> SensorReading(sensor_1,1547718200,30.8)
data stream> SensorReading(sensor_1,1547718201,40.8)
min temperature
> (sensor_1,30.8)

案例2:帶水位的滾動時間視窗

  程式碼分析:

  ①通過env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)設定視窗的時間特性為事件時間。

  ②在assignTimestampsAndWatermarks()方法中,傳遞一個BoundedOutOfOrdernessTimestampExtractor類實現物件,構造器引數就是容忍的延遲時間,實現方法,指明時間戳用哪個欄位。

object WindowTest {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val socketStream = env.socketTextStream("hadoop102",7777)

    val dataStream: DataStream[SensorReading] = socketStream
      .map(d => {
        val arr = d.split(",")
        SensorReading(arr(0).trim,arr(2).toDouble)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(2)) {
        override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000
      })

      //.assignAscendingTimestamps(_.timestamp) //升序資料添指定時間戳

    //統計5秒內的最小溫度
    val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5)) //5秒滾動視窗
      .reduce((data1,data1._2.min(data2._2)))

    //列印原始的dataStream
    dataStream.print("data stream")

    //列印視窗資料流
    minTemperatureStream.print("min temperature")

    env.execute("window test")

  }

}

  測試:

  當輸入第一條資料時,時間戳是1547718200(單位秒),因為視窗的長度為5,所以理論上當時間戳為1547718205的資料來後,視窗會列印輸出,但是由於設定了延遲2秒,所以此時水位才到1547718203,所以只有當時間戳為1547718207或之後的資料到來,水位線漲到大於等於1547718205時,窗口才會觸發計算並關閉。

  sockt輸入資料如下

[[email protected] ~]$ nc -lk 7777
sensor_1,31         
sensor_1,1547718202,32  
sensor_1,1547718203,33  
sensor_1,1547718204,34  
sensor_1,1547718205,35  
sensor_1,1547718206,36  
sensor_1,1547718207,37  
sensor_1,1547718208,38

  控制檯列印如下:

data stream> SensorReading(sensor_1,31.0)
data stream> SensorReading(sensor_1,1547718202,32.0)
data stream> SensorReading(sensor_1,1547718203,33.0)
data stream> SensorReading(sensor_1,1547718204,34.0)
data stream> SensorReading(sensor_1,1547718205,35.0)
data stream> SensorReading(sensor_1,1547718206,36.0)
data stream> SensorReading(sensor_1,1547718207,37.0)
min temperature> (sensor_1,1547718208,38.0)

案例3:滑動時間視窗

  滑動視窗和滾動視窗特性類似,滾動視窗可以看作一種特殊的滑動視窗,其視窗長度與滑動長度一樣。在.timeWindow(Time.seconds(10),Time.seconds(5)) 方法中,設定了視窗的長度為10,滑動長度為5。視窗長度決定了視窗計算的資料的範圍有多大,而滑動長度決定了視窗計算並關閉的時機。

//統計10秒內的最小溫度,5秒輸出一次
val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature))
  .keyBy(_._1)
  .timeWindow(Time.seconds(10),Time.seconds(5)) //滑動視窗
  .reduce((data1,data1._2.min(data2._2)))