Flink之對時間的處理
window+trigger+watermark處理全域性亂序資料,指定視窗上的allowedLateness可以處理特定視窗操作的區域性事件時間亂序資料
1、流處理系統中的微批
Flink內部也使用了某種形式的微批處理技術,在shuffle階段將含有多個事件的緩衝容器通過網路傳送,而不是傳送單個事件
流處理系統中的批處理必須滿足以下兩點要求:
- 批處理只作為提高系統效能的機制。批量越大,系統的吞吐量就越大。
- 為了提高效能而使用的批處理必須完全獨立於定義視窗時所用的緩衝,或者為了保證容錯性而提交的程式碼,也不能作為 API 的一部分。否則,系統將受到限制,並且變得脆弱且難以使用。
2、時間概念
- 事件時間,即事件實際發生的時間(由水印觸發器實現),基於事件時間處理可實現時間回溯並正確地重新處理資料
- 處理時間,即事件被處理的時間,是處理事件的機器所測量的時間
- 攝取時間,即事件進入流處理框架的時間,缺乏事件時間的資料會被處理器附上攝取時間(由source函式完成)
3、視窗
所有內建視窗都由同一種機制實現,開窗機制與檢查點機制完全分離;可直接用基本的開窗機制定義更復雜的視窗(如某種時間視窗,可基於元素計數生成中間結果)
視窗時間區間是按自然時間分配的,比如3秒的時間間隔,[0,3) [3,6)
(1)時間視窗(每隔B時長對A時長內資料聚合)
- 設定事件時間 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- 設定處理時間 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
- 設定攝取時間 env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
- 滾動視窗A stream.timeWindow(Time.minutes(1)) stream.window(TumblingEventTimeWindows.of(Time.seconds(1)))
- 滑動視窗B stream.timeWindow(Time.minutes(1), Time.seconds(30)) stream.window(TumblingEventTimeWindows.of(Time.seconds(1)), SlidingEventTimeWindows.of(Time.seconds(30)))
(2)計數視窗(每隔B個元素對A個元素進行聚合)
為避免永遠達不到計數視窗而浪費記憶體,可用時間視窗觸發超時
- 滾動視窗A stream.countWindow(4)
- 滑動視窗B stream.countWindow(4, 2)
(3)會話視窗(會話即活動階段,其前後都是非活躍階段,常用於無固定持續時間或無固定互動次數的場景)
由超時時間設定,即希望非活躍狀態持續多久才結束視窗。window區間:當b比上一條記錄a延遲超過超時時間t時,出現會話視窗[上一個window_end, b-t)
- 事件時間會話視窗 stream.window(EventTimeSessionWindows.withGap(Time.minutes(5))
- 處理時間會話視窗 stream.window(ProcessingTimeSessionWindows.withGap(Time.minutes(5))
處理延遲資料
- allowedLateness(Time.minutes(60))
縮短反饋時間(若使用者會話遲遲不結束,反饋時間過長)
- trigger(ContinuousEventTrigger.of(Time.minutes(10)) #每10分鐘輸出一個結果並覆蓋之前的
(4)全域性視窗(對全部資料進行統計,使用流方法實現批處理)
內建觸發器是NeverTrigger,永遠不會觸發,需要自定義觸發器才有意義
stream.window(GlobalWindows.create()).trigger(...)
4、觸發器
繼承Trigger類,Trigger抽象類的結構:
boolean canMerge()
void clear(W window, TriggerContext ctx)
TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) 每個元素到來時執行
TriggerResult onEventTime(long time, W window, TriggerContext ctx) Timer到期後執行
void onMerge(W window, OnMergeContext ctx)
TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
5、水印
視窗 + 水印,用於解決亂序問題(並不是解決,而是假定所有正常的事件都只是一定程度內亂序,可以解決此程度內的亂序)
當Watermark在紅色區域時,視窗內的元素會計算
(1)基於事件時間處理時,水印是判斷所有事件到達的標誌,開始計算和輸出結果,晚於處理時間但早於此水印時間的事件也可被正確處理。
水印定義最長遲到資料(比當前watermark還早的資料會被丟棄,水印閾值越大,允許的遲到資料越久)
watermark的值不是全域性的,但與key無關,有幾個並行,就有幾個watermark,window的觸發條件與最小的watermark有關
水印時間 = 收到的最大事件時間 - 水印閾值
一個操作運算元收到多個並行流的輸入時,取最小的watermark作為當前運算元的watermark
(2)異常情況:如果水印遲到得太久(可能是maxOrderness設定太大,也可能是後序事件過晚到達),收到結果的速度會變慢,解決方法是在水印到達之前輸出近似結果,其實就是後面設定Lateness的方案;如果水印到達得太早(可能是maxOrderness設定太小,也可能是後序事件過早到達),則可能丟失一些前序事件,收到錯誤結果,解決方法是採用Flink作業監控事件流,學習事件的遲到規律,以此構建水印模型
(3)分配Timestamp和Watermark
timestamp和watermark都是通過從1970年1月1日0時0分0秒到現在的毫秒數來指定的
先後順序:分配timstamp是按設定的時間間隔定時執行的,即使無資料進來也會執行,這就造成了getCurrentWatermark呼叫後看上去第一個watermark永遠是以0為基準計算顯示的 ,但實際並不是按那個算的。第2條的watermark如果是23的話,是不大於window_end 24的,也就不應該觸發,而如果是下一條的24則可以觸發。AssignerWithPeriodicWatermarks子類是每隔一段時間執行的,這個具體由ExecutionConfig.setAutoWatermarkInterval設定,如果沒有設定會幾乎沒有間隔地呼叫getCurrentWatermark方法。之所以會出現-10000時因為你沒有資料進入視窗,當然一直都是-10000,但是getCurrentWatermark方法不是在執行extractTimestamp後才執行的
直接在資料來源生成(推薦,資料生成時即分配timestamp和watermark)
實現SourceFuntion介面的run方法,並呼叫如下方法:
- 分配timestamp:SourceContext.collectWithTimestamp(...)
- 分配watermark:SourceContext.emitWatermark(new WaterMark(...))
獲取流後使用生成器生成新流(使用此種方式,會覆蓋源提供的timestamp和watermark,注意一定要在時間視窗之前生成)
stram.assignTimestampsAndWatermarks( AssignerWithPeriodicWatermarks/AssignerWithPunctuatedWatermarks 實現類物件)
定義分配器
AssignerWithPeriodicWatermarks(週期性水印,分配時間戳並定期生成水印)
watermark產生的事件間隔(每n毫秒)是通過ExecutionConfig.setAutoWatermarkInterval(...)來定義的,當getCurrentWatermark()被呼叫時,若返回的watermark非空且大於上一個watermark,則發射一個新的watermark
- 預定義實現類(使用時重寫extractTimestamp):
- AscendingTimestampExtractor 適用於時間戳遞增的情況
- BoundedOutOfOrdernessTimestampExtractor 適用於亂序但最大延遲已知的情況
- 自定義實現類(使用時重寫getCurrentWatermark、extractTimestamp)
AssignerWithPunctuatedWatermarks(帶斷點水印)
事件驅動生成水印,每個單獨的event都可以產生一個watermark,會有額外計算,過多可能導致效能降低。任何一個event都觸發extractTimestamp(...)來為元素分配一個timestamp,然後立即呼叫該元素上的checkAndGetNextWatermark(...)方法,一旦checkAndGetNextWatermark(...)返回一個非空的watermark並且watermark比前一個watermark大的話,這個新的watermark將會被髮送
(4)設定水印後觸發window的條件:
- watermark >= window_end(開啟多併發後,每個運算元接收到的watermark都會進行對齊,取最小的watermark作為最終的watermark並往下一個運算元傳送)
- 在[window_begin, window_end)中有資料存在
(5)不足之處
無法應對遲到資料,如果一個視窗已經被觸發了,即使滿足上述條件也不會第二次觸發視窗。水印被髮射到下一個運算元前已預設比水印更早的資料已經全部處理了
6、allowedLateness
主要用於解決遲到問題,給遲到資料第二次或多次觸發window的機會,可對無法觸發window的遲到資料單獨處理
預設情況下,watermark超過end-of-window後,將忽略之後到達的符合window的資料
在Watermark < 視窗結束時間 + Lateness時,仍會繼續等待視窗內的元素參與視窗計算,計算時要注意狀態值的重複,直到Watermark >= 視窗結束時間 + Lateness 時清空快取
要注意再次觸發視窗時,UDF中的狀態值的處理,要考慮state在計算時的去重問題
(1)
- 對於trigger是預設的EventTimeTrigger的情況,allowedLateness會再次觸發視窗的計算,而之前觸發的資料,會buffer起來,直到watermark超過end-of-window + allowedLateness的時間,視窗的資料及元資料資訊才會被刪除。再次計算就是DataFlow模型中的Accumulating的情況。
- 對於sessionWindow情況,當late element在allowedLateness範圍之內到達時,可能會引起視窗的merge,這樣,之前視窗的資料會在新視窗中累加計算,這就是DataFlow模型中的AccumulatingAndRetracting的情況。
(2)觸發條件
- watermark < window_end + allowedLateness
- 在[window_begin, window_end)中有late資料存在
7、定時器Timer
Flink Streaming API提供的用於感知並利用處理時間/事件時間變化的機制
(1)在KeyedProcessFunction實現類裡定義定時器為例:
重寫processElement(),對每個輸入元素註冊定時器
重寫onTimer(),定時器觸發時執行的邏輯
根據時間特徵的不同,具體如下:
處理時間——呼叫Context.timerService().registerProcessingTimeTimer()註冊;onTimer()在系統時間戳達到Timer設定的時間戳時觸發。
事件時間——呼叫Context.timerService().registerEventTimeTimer()註冊;onTimer()在Flink內部水印達到或超過Timer設定的時間戳時觸發。
(2)EventTimeTrigger使用Timer實現觸發時間視窗
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { return TriggerResult.FIRE; }
else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; }
}