1. 程式人生 > 其它 >Flink筆記14:Flink之window起始點的確定與watermark使用詳解

Flink筆記14:Flink之window起始點的確定與watermark使用詳解

技術標籤:Flinkflink大資料scala

1、window起始時間的確定

在TimeWindow.java中有如下方法來確定window的起始時間

public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
	return timestamp - (timestamp - offset + windowSize) % windowSize;
}

假設我設定瞭如下視窗,並將第一條資料的時間戳設定為1547718199。

timeWindow(Time.seconds(
15))

那麼經過計算,得到起始時間戳為1547718195,第一個視窗就是[ 1547718195, 1547718210 )

1547718199-(1547718199-0+15)%15=1547718195

2、watermark使用詳解

object WindowTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(
TimeCharacteristic.EventTime) //每50ms生成一個watermark env.getConfig.setAutoWatermarkInterval(50) // 讀取資料 val inputStream = env.socketTextStream("localhost", 7777) // 先轉換成樣例類型別(簡單轉換操作) val dataStream = inputStream .map( data => { val arr = data.split(","
) SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) } ) // .assignAscendingTimestamps(_.timestamp * 1000L) // 升序資料提取時間戳 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) { override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L }) val latetag = new OutputTag[(String, Double, Long)]("late") // 每15秒統計一次,視窗內各感測器所有溫度的最小值,以及最新的時間戳 val resultStream = dataStream .map( data => (data.id, data.temperature, data.timestamp) ) .keyBy(_._1) // 按照二元組的第一個元素(id)分組 .timeWindow(Time.seconds(15)) .allowedLateness(Time.seconds(5)) .sideOutputLateData(latetag) .reduce( (curRes, newData) => (curRes._1, curRes._2.min(newData._2), newData._3) ) resultStream.getSideOutput(latetag).print("late") resultStream.print("result") env.execute("window test") } } // 定義樣例類,溫度感測器 case class SensorReading( id: String, timestamp: Long, temperature: Double )

上述程式碼是每15秒統計一次視窗內各感測器所有溫度的最小值,以及最新的時間戳。watermark延遲為3秒,允許遲到5秒,之後再遲到的話會被放入側輸出流。

假設我有如下資料

sensor_1,1547718199,35.8
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9
sensor_1,1547718200,25.6
sensor_1,1547718218,27.2
sensor_1,1547718205,31.7

將這些資料一個個的通過nc輸入,直到第五條資料才有結果輸出。
我的第一條資料的時間戳是1547718199,經過計算,得到起始時間戳為1547718195,由於視窗大小為15s,所以第一個視窗就是[ 1547718195, 1547718210 )。當資料的時間戳達到視窗的臨界點1547718210時就會觸發視窗計算,輸出結果。那麼第四條資料出現的時候就該觸發計算操作,但由於設定了watermark延遲時間為3s,所以只有在當前資料時間戳減去3,大於等於1547718210時才會觸發計算,即第五條資料才觸發計算。由於視窗左閉右開,所以就是計算前三條資料的最小溫度和最大時間戳。
在這裡插入圖片描述
當輸入第六條資料時又進行了一次計算,這是因為設定了允許遲到5秒。當前最大時間戳是1547718213,watermark就是1547718210,加5秒延遲就是1547718215。也就是隻要當前watermark小於1547718215,或者說當前時間戳小於1547718218,就可以接收[ 1547718195, 1547718210 )中的延遲資料。
在這裡插入圖片描述
再新增一條時間戳為1547718218的資料,沒有輸出,因為這條資料不屬於[ 1547718195, 1547718210 )視窗,又沒有達到觸發下一個視窗[ 1547718210, 1547718225 )計算的條件。
在這裡插入圖片描述
最後再新增一條時間戳為1547718205的資料,由於上一條資料的時間戳為1547718218,減去允許延遲的5秒和watermark延遲3秒,就是1547718210 ,達到[ 1547718195, 1547718210 )視窗臨界,此視窗關閉。所以這條資料被放入側輸出流,作為遲到資料,後續再處理。
在這裡插入圖片描述

生成watermark的時間間隔

原始碼:
在這裡插入圖片描述
預設的watermark生成時間間隔,如果是ProcessingTime為0,其他則為200ms(如EventTime)。如果想自定義時間間隔可以通過如下程式碼實現:

env.getConfig.setAutoWatermarkInterval(50)

BoundedOutOfOrdernessTimestampExtractor實現了AssignerWithPeriodicWatermarks,所以它也是週期生成watermark的。

疑問:
當時間戳為1547718218的資料進入時,此時的watermark應該是1547718215,減去5秒延遲為1547718210,導致[ 1547718195, 1547718210 )視窗關閉,後面來的1547718205資料就成了遲到資料。

如果將watermark生成的時間間隔設定的很大,當時間戳為1547718218的資料進入時,watermark還沒有更新,還是資料1547718213時的watermark 1547718210,減去5秒,為1547718205,此時[ 1547718195, 1547718210 )視窗就不會關閉,那麼1547718205資料能正常進入視窗嗎?