1. 程式人生 > 其它 >Flink難點解析:揭開Watermark的神祕面紗

Flink難點解析:揭開Watermark的神祕面紗

目錄


Apache Flink稱為 終極流式框架,不僅僅提供高吞吐、低延遲和Exactly-Once語義的實時計算能力,還提供了基於流式引擎處理批量資料的計算能力,真正意義上實現了 批流統一,無疑是繼Spark和Storm的後起之秀。
但是入門Apache Flink,會接觸到 水印水位線陌生的技術詞彙,但到底什麼是水位線,又給Apache Flink蒙上了一層神祕的面紗。這裡就為大家揭開Watermark的神祕面紗。

一、Watermark作用

Apache Flink使用EventTime

處理Stream資料,流處理從Event(事件)產生,流經Source,再到Operator,這中間需要一定的時間。雖然大部分情況下,傳輸到Operator的資料都是按照事件產生的時間順序來的,但是也不排除由於網路延遲、訊息積壓和背壓等原因而導致亂序的產生,特別是使用Kafka的時候,多個分割槽之間的資料無法保證有序。因此,在進行Window計算的時候,不能無限期地等下去,必須有機制保證在特定的時間後, 觸發Window進行計算,即這個機制就是Watermark(水位線)。

二、原理

在Apache Flink的視窗處理過程中,如果確定全部資料到達,就可以對Window的資料做視窗計算操作(如彙總、分組等);如果資料沒有全部到達,則繼續等待該視窗中的資料全部到達才開始處理。這種情況下就需要用到水位線(WaterMarks)機制,它能夠衡量資料處理進度(表達資料到達的完整性),保證事件資料全部到達Flink系統,或者在亂序及延遲到達時,也能夠像預期一樣計算出正確並且連續的結果。

2.1 事件流Watermark計算?

  • Watermark = 事件流最大事件時間(EventTime) - 延遲時間(DelayTime)

2.2 何時觸發視窗函式計算?

  • 視窗[windowStartTime,WindowEndTime)中有資料
  • Watermark >= WindowEndTime

2.3 案例

設定10s時間視窗,按照時間視窗分為[0,10s)[10,20s)等。以[0,10s)為例,那麼視窗startTime=0,endTime=10。

假設有4個事件流資料和EventTime分別為:A-8s, B-12.5s, C-9s, D-13.5s,設定視窗延遲時間為3.5s,也就是Watermark=當前所有到達資料EventTime的最大值-視窗延遲時間,對於遲到的資料,只等你3.5秒。

當A到達的時候,Watermark = max{8} - 3.5 = 4.5 < 10,不觸發計算
當B到達的時候,Watermark = max(12.5, 8) - 3.5 = 12.5-3.5 = 9 < 10,不觸發計算
當C到達的時候,Watermark = max(12.5, 8, 9) - 3.5 = 9 < 10,不觸發計算
當D到達的時候,Watermark = max(13.5, 12.5, 8, 9) - 3.5 = 10 = 10,觸發計算
觸發計算的時候,會將AC(都小於10)都計算進去。

通過Watermark方式,就將遲到的C計算進去了。

三、原始碼分析

package org.apache.flink.streaming.api.windowing.triggers;
/**
 * EventTime時間視窗觸發器
 */
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {

	...

	@Override
	public TriggerResult onElement(Object element, long timestamp, 
	                      TimeWindow window, TriggerContext ctx) throws Exception {
	    // 如果視窗最大時間戳 <= 事件流當前水印時間戳,觸發視窗計算
		if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
			// if the watermark is already past the window fire immediately
			return TriggerResult.FIRE;
		} else {
			ctx.registerEventTimeTimer(window.maxTimestamp());
			return TriggerResult.CONTINUE;
		}
	}
	
	...
	
}