Flink 視窗 window
一、基本概念
1.視窗分類
TimeWindow:按照時間生成 Window。對於 TimeWindow,可以根據視窗實現原理的不同分成三類:滾動視窗(TumblingWindow)、滑動視窗(Sliding Window)和會話視窗(Session Window)。
CountWindow:按照指定的資料條數生成一個 Window,與時間無關。2.時間分類
Event Time:是事件建立的時間。它通常由事件中的時間戳描述,例如採集的日誌資料中,每一條日誌都會記錄自己的生成時間,Flink 通過時間戳分配器訪問事件時間戳。
Ingestion Time:是資料進入 Flink 的時間。
二、案例演示
案例1:按Processing Time劃分滾動時間視窗
TimeWindow 是將指定時間範圍內的所有資料組成一個 window,一次對一個window 裡面的所有資料進行計算。Flink 預設的時間視窗根據 Processing Time 進行視窗的劃分,將 Flink 獲取到的資料根據進入 Flink 的時間劃分到不同的視窗中。時間間隔可以通過 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。 在下面的程式碼中,因為沒有指定時間型別,所以採用了預設的Processing Time,即Flink實際計算資料的時間,通過.timeWindow(Time.seconds(10))設定視窗的大小為10秒,當一條資料進來後開始計時,10秒之後輸出這個視窗中所有資料的計算結果。import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object WindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val socketStream = env.socketTextStream("hadoop102",7777) val dataStream: DataStream[SensorReading] = socketStream.map(d => { val arr = d.split(",") SensorReading(arr(0).trim,arr(1).trim.toLong,arr(2).toDouble) }) //統計10秒內的最小溫度 val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(10)) //10秒滾動視窗,不指定時間特性,預設為ProcessingTime .reduce((data1,data2)=>(data1._1,data1._2.min(data2._2))) //列印原始的dataStream dataStream.print("data stream") //列印視窗資料流 minTemperatureStream.print("min temperature") env.execute("window test") } }
測試:
連續輸入兩條資料
[[email protected] ~]$ nc -lk 7777 sensor_1,1547718200,30.8 sensor_1,1547718201,40.8
在一個10秒的滾動視窗內,視窗流minTemperatureStream只輸出了一條資料。此時觸發TimeWindow去計算的時機就是第一條資料來的10秒過後。
data stream> SensorReading(sensor_1,1547718200,30.8) data stream> SensorReading(sensor_1,1547718201,40.8) min temperature> (sensor_1,30.8)
案例2:帶水位的滾動時間視窗
程式碼分析:
①通過env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)設定視窗的時間特性為事件時間。
②在assignTimestampsAndWatermarks()方法中,傳遞一個BoundedOutOfOrdernessTimestampExtractor類實現物件,構造器引數就是容忍的延遲時間,實現方法,指明時間戳用哪個欄位。
object WindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val socketStream = env.socketTextStream("hadoop102",7777) val dataStream: DataStream[SensorReading] = socketStream .map(d => { val arr = d.split(",") SensorReading(arr(0).trim,arr(2).toDouble) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(2)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000 }) //.assignAscendingTimestamps(_.timestamp) //升序資料添指定時間戳 //統計5秒內的最小溫度 val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(5)) //5秒滾動視窗 .reduce((data1,data1._2.min(data2._2))) //列印原始的dataStream dataStream.print("data stream") //列印視窗資料流 minTemperatureStream.print("min temperature") env.execute("window test") } }
測試:
當輸入第一條資料時,時間戳是1547718200(單位秒),因為視窗的長度為5,所以理論上當時間戳為1547718205的資料來後,視窗會列印輸出,但是由於設定了延遲2秒,所以此時水位才到1547718203,所以只有當時間戳為1547718207或之後的資料到來,水位線漲到大於等於1547718205時,窗口才會觸發計算並關閉。
sockt輸入資料如下
[[email protected] ~]$ nc -lk 7777 sensor_1,31 sensor_1,1547718202,32 sensor_1,1547718203,33 sensor_1,1547718204,34 sensor_1,1547718205,35 sensor_1,1547718206,36 sensor_1,1547718207,37 sensor_1,1547718208,38
控制檯列印如下:
data stream> SensorReading(sensor_1,31.0) data stream> SensorReading(sensor_1,1547718202,32.0) data stream> SensorReading(sensor_1,1547718203,33.0) data stream> SensorReading(sensor_1,1547718204,34.0) data stream> SensorReading(sensor_1,1547718205,35.0) data stream> SensorReading(sensor_1,1547718206,36.0) data stream> SensorReading(sensor_1,1547718207,37.0) min temperature> (sensor_1,1547718208,38.0)
案例3:滑動時間視窗
滑動視窗和滾動視窗特性類似,滾動視窗可以看作一種特殊的滑動視窗,其視窗長度與滑動長度一樣。在.timeWindow(Time.seconds(10),Time.seconds(5)) 方法中,設定了視窗的長度為10,滑動長度為5。視窗長度決定了視窗計算的資料的範圍有多大,而滑動長度決定了視窗計算並關閉的時機。
//統計10秒內的最小溫度,5秒輸出一次 val minTemperatureStream = dataStream.map(data=>(data.id,data.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(10),Time.seconds(5)) //滑動視窗 .reduce((data1,data1._2.min(data2._2)))