Flink中Watermark定時生成原始碼分析
阿新 • • 發佈:2021-07-26
watermark的生成策略有兩種:一種是週期性生成,另外一種是根據特定標記生成。在實際使用中大多數情況下會選擇週期性生成方式也就是AssignerWithPeriodicWatermarks方式,使用方式如下:
//指定為evenTime時間語義 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //生成watermark的週期 env.getConfig.setAutoWatermarkInterval(watermarkInterval) //指定方式 dataStream.assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor[Element](Time.seconds(allowDealy)) { override def extractTimestamp(element: Element): Long = element.dT })
assignTimestampsAndWatermarks 可以理解為是一個運算元轉換操作,等同於map/window一樣理解,可以為其設定並行度、名稱,也是一個transformation/operator,
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) { final int inputParallelism = getTransformation().getParallelism(); final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner); TimestampsAndPeriodicWatermarksOperator<T> operator = new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) .setParallelism(inputParallelism); }
在生成的jobGraph中,也是作為其中的一部分:
預設的名稱就是 Timestamps/Watermarks。
接下來深入分析其使用的StreamOperator型別TimestampsAndPeriodicWatermarksOperator,其繼承了AbstractUdfStreamOperator,實現了OneInputStreamOperator介面與ProcessingTimeCallback介面,具體包含的方法:
open方法:
public void open() throws Exception { super.open(); //初始化預設當前watermark currentWatermark = Long.MIN_VALUE; //生成watermark週期時間配置 watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); //註冊定時其配置 if (watermarkInterval > 0) { long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); } }
最重要的就是getProcessingTimeService().registerTimer註冊一個watermarkInterval後觸發的定時器,傳入回撥引數是this,也就是會呼叫當前物件的onProcessingTime方法(關於這部分知識可以檢視Flink的定時系列)。
processElement方法:
public void processElement(StreamRecord<T> element) throws Exception { final long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); output.collect(element.replace(element.getValue(), newTimestamp)); }
提取當前的事件時間,在BoundedOutOfOrdernessTimestampExtractor中會儲存當前最大的事件時間。
onProcessingTime方法:
public void onProcessingTime(long timestamp) throws Exception { // register next timer Watermark newWatermark = userFunction.getCurrentWatermark(); //當新的watermark大於當前的watermark if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); //將符合要求的watermark傳送出去 output.emitWatermark(newWatermark); } //註冊下一次觸發時間 long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); }
該方法表示的就是定時回撥的方法,將符合要求的watermark傳送出去並且註冊下一個定時器。另外該方法與processElement方法是兩個互斥的方法,內部使用了同一把鎖做同步控制。
processWatermark方法:
public void processWatermark(Watermark mark) throws Exception { if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) { currentWatermark = Long.MAX_VALUE; output.emitWatermark(mark); } }
用來處理上游傳送過來的watermark,可以認為不做任何處理,下游的watermark只與其上游最近的生成方式相關。