1. 程式人生 > 其它 >Flink中Watermark定時生成原始碼分析

Flink中Watermark定時生成原始碼分析

watermark的生成策略有兩種:一種是週期性生成,另外一種是根據特定標記生成。在實際使用中大多數情況下會選擇週期性生成方式也就是AssignerWithPeriodicWatermarks方式,使用方式如下:

//指定為evenTime時間語義
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//生成watermark的週期
env.getConfig.setAutoWatermarkInterval(watermarkInterval)
//指定方式
dataStream.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[Element](Time.seconds(allowDealy)) { override def extractTimestamp(element: Element): Long = element.dT })

assignTimestampsAndWatermarks 可以理解為是一個運算元轉換操作,等同於map/window一樣理解,可以為其設定並行度、名稱,也是一個transformation/operator,

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
      AssignerWithPeriodicWatermarks
<T> timestampAndWatermarkAssigner) { final int inputParallelism = getTransformation().getParallelism(); final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner); TimestampsAndPeriodicWatermarksOperator<T> operator = new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) .setParallelism(inputParallelism); }

在生成的jobGraph中,也是作為其中的一部分:

預設的名稱就是 Timestamps/Watermarks。
接下來深入分析其使用的StreamOperator型別TimestampsAndPeriodicWatermarksOperator,其繼承了AbstractUdfStreamOperator,實現了OneInputStreamOperator介面與ProcessingTimeCallback介面,具體包含的方法:
open方法:

public void open() throws Exception {
    super.open();
    //初始化預設當前watermark
    currentWatermark = Long.MIN_VALUE;
    //生成watermark週期時間配置
    watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
    //註冊定時其配置
    if (watermarkInterval > 0) {
      long now = getProcessingTimeService().getCurrentProcessingTime();
      getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    }
  }

最重要的就是getProcessingTimeService().registerTimer註冊一個watermarkInterval後觸發的定時器,傳入回撥引數是this,也就是會呼叫當前物件的onProcessingTime方法(關於這部分知識可以檢視Flink的定時系列)。
processElement方法:

public void processElement(StreamRecord<T> element) throws Exception {
    final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
        element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

    output.collect(element.replace(element.getValue(), newTimestamp));
  }
提取當前的事件時間,在BoundedOutOfOrdernessTimestampExtractor中會儲存當前最大的事件時間。

onProcessingTime方法:

public void onProcessingTime(long timestamp) throws Exception {    // register next timer
    Watermark newWatermark = userFunction.getCurrentWatermark();
    //當新的watermark大於當前的watermark
    if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
      currentWatermark = newWatermark.getTimestamp();
      //將符合要求的watermark傳送出去
      output.emitWatermark(newWatermark);
    }
    //註冊下一次觸發時間
    long now = getProcessingTimeService().getCurrentProcessingTime();
    getProcessingTimeService().registerTimer(now + watermarkInterval, this);
  }
該方法表示的就是定時回撥的方法,將符合要求的watermark傳送出去並且註冊下一個定時器。另外該方法與processElement方法是兩個互斥的方法,內部使用了同一把鎖做同步控制。

processWatermark方法:

public void processWatermark(Watermark mark) throws Exception {
    if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
      currentWatermark = Long.MAX_VALUE;
      output.emitWatermark(mark);
    }
  }
用來處理上游傳送過來的watermark,可以認為不做任何處理,下游的watermark只與其上游最近的生成方式相關。