Flink筆記14:Flink之window起始點的確定與watermark使用詳解
1、window起始時間的確定
在TimeWindow.java中有如下方法來確定window的起始時間
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
假設我設定瞭如下視窗,並將第一條資料的時間戳設定為1547718199。
timeWindow(Time.seconds( 15))
那麼經過計算,得到起始時間戳為1547718195,第一個視窗就是[ 1547718195, 1547718210 )
1547718199-(1547718199-0+15)%15=1547718195
2、watermark使用詳解
object WindowTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime)
//每50ms生成一個watermark
env.getConfig.setAutoWatermarkInterval(50)
// 讀取資料
val inputStream = env.socketTextStream("localhost", 7777)
// 先轉換成樣例類型別(簡單轉換操作)
val dataStream = inputStream
.map( data => {
val arr = data.split("," )
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
} )
// .assignAscendingTimestamps(_.timestamp * 1000L) // 升序資料提取時間戳
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
val latetag = new OutputTag[(String, Double, Long)]("late")
// 每15秒統計一次,視窗內各感測器所有溫度的最小值,以及最新的時間戳
val resultStream = dataStream
.map( data => (data.id, data.temperature, data.timestamp) )
.keyBy(_._1) // 按照二元組的第一個元素(id)分組
.timeWindow(Time.seconds(15))
.allowedLateness(Time.seconds(5))
.sideOutputLateData(latetag)
.reduce( (curRes, newData) => (curRes._1, curRes._2.min(newData._2), newData._3) )
resultStream.getSideOutput(latetag).print("late")
resultStream.print("result")
env.execute("window test")
}
}
// 定義樣例類,溫度感測器
case class SensorReading( id: String, timestamp: Long, temperature: Double )
上述程式碼是每15秒統計一次視窗內各感測器所有溫度的最小值,以及最新的時間戳。watermark延遲為3秒,允許遲到5秒,之後再遲到的話會被放入側輸出流。
假設我有如下資料
sensor_1,1547718199,35.8
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9
sensor_1,1547718200,25.6
sensor_1,1547718218,27.2
sensor_1,1547718205,31.7
將這些資料一個個的通過nc輸入,直到第五條資料才有結果輸出。
我的第一條資料的時間戳是1547718199,經過計算,得到起始時間戳為1547718195,由於視窗大小為15s,所以第一個視窗就是[ 1547718195, 1547718210 )。當資料的時間戳達到視窗的臨界點1547718210時就會觸發視窗計算,輸出結果。那麼第四條資料出現的時候就該觸發計算操作,但由於設定了watermark延遲時間為3s,所以只有在當前資料時間戳減去3,大於等於1547718210時才會觸發計算,即第五條資料才觸發計算。由於視窗左閉右開,所以就是計算前三條資料的最小溫度和最大時間戳。
當輸入第六條資料時又進行了一次計算,這是因為設定了允許遲到5秒。當前最大時間戳是1547718213,watermark就是1547718210,加5秒延遲就是1547718215。也就是隻要當前watermark小於1547718215,或者說當前時間戳小於1547718218,就可以接收[ 1547718195, 1547718210 )中的延遲資料。
再新增一條時間戳為1547718218的資料,沒有輸出,因為這條資料不屬於[ 1547718195, 1547718210 )視窗,又沒有達到觸發下一個視窗[ 1547718210, 1547718225 )計算的條件。
最後再新增一條時間戳為1547718205的資料,由於上一條資料的時間戳為1547718218,減去允許延遲的5秒和watermark延遲3秒,就是1547718210 ,達到[ 1547718195, 1547718210 )視窗臨界,此視窗關閉。所以這條資料被放入側輸出流,作為遲到資料,後續再處理。
生成watermark的時間間隔
原始碼:
預設的watermark生成時間間隔,如果是ProcessingTime為0,其他則為200ms(如EventTime)。如果想自定義時間間隔可以通過如下程式碼實現:
env.getConfig.setAutoWatermarkInterval(50)
BoundedOutOfOrdernessTimestampExtractor實現了AssignerWithPeriodicWatermarks,所以它也是週期生成watermark的。
疑問:
當時間戳為1547718218的資料進入時,此時的watermark應該是1547718215,減去5秒延遲為1547718210,導致[ 1547718195, 1547718210 )視窗關閉,後面來的1547718205資料就成了遲到資料。
如果將watermark生成的時間間隔設定的很大,當時間戳為1547718218的資料進入時,watermark還沒有更新,還是資料1547718213時的watermark 1547718210,減去5秒,為1547718205,此時[ 1547718195, 1547718210 )視窗就不會關閉,那麼1547718205資料能正常進入視窗嗎?