1. 程式人生 > >Flink EventTime和Watermarks案例分析

Flink EventTime和Watermarks案例分析

目錄

解釋:

針對資料亂序需求的案例分析,需要使用eventtime和watermark來解決

解釋:

watermarks的生成方式有兩種

1:With Periodic Watermarks:週期性的觸發watermark的生成和傳送

2:With Periodic Watermarks:基於某些事件觸發watermark的生成和傳送

第一種方式比較常用,所以在這裡我們使用第一種方式進行分析。

參考官網文件中With Periodic Watermarks的使用方法

程式碼中的extractTimestamp方法是從資料本身中提取EventTime

getCurrentWatermar方法是獲取當前水位線,利用currentMaxTimestamp - maxOutOfOrderness

這裡的maxOutOfOrderness表示是允許資料的最大亂序時間

所以在這裡我們使用的話也實現介面AssignerWithPeriodicWatermarks。

1:實現watermark相關程式碼

1.1:程式說明

從socket模擬接收資料,然後使用map進行處理,後面再呼叫assignTimestampsAndWatermarks方法抽取timestamp並生成watermark。最後再呼叫window列印資訊來驗證window被觸發的時機。

1.2:程式碼如下

完整程式碼如下:

package xuwei.tech.streaming.streamApiDemo;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;


/**
 *
 * Watermark 案例
 *
 * Created by xuwei.tech.
 */
public class StreamingWindowWatermark {

    public static void main(String[] args) throws Exception {
        //定義socket的埠號
        int port = 9000;
        //獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //設定使用eventtime,預設是使用processtime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //設定並行度為1,預設並行度是當前機器的cpu數量
        env.setParallelism(1);

        //連線socket獲取輸入的資料
        DataStream<String> text = env.socketTextStream("hadoop100", port, "\n");

        //解析輸入的資料
        DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String value) throws Exception {
                String[] arr = value.split(",");
                return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
            }
        });

        //抽取timestamp和生成watermark
        DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {

            Long currentMaxTimestamp = 0L;
            final Long maxOutOfOrderness = 10000L;// 最大允許的亂序時間是10s

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
            /**
             * 定義生成watermark的邏輯
             * 預設100ms被呼叫一次
             */
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
            }

            //定義如何提取timestamp
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
                long timestamp = element.f1;
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                System.out.println("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
                        sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
                return timestamp;
            }
        });

        //分組,聚合
        DataStream<String> window = waterMarkStream.keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照訊息的EventTime分配視窗,和呼叫TimeWindow效果一樣
                .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
                    /**
                     * 對window內的資料進行排序,保證資料的順序
                     * @param tuple
                     * @param window
                     * @param input
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
                        String key = tuple.toString();
                        List<Long> arrarList = new ArrayList<Long>();
                        Iterator<Tuple2<String, Long>> it = input.iterator();
                        while (it.hasNext()) {
                            Tuple2<String, Long> next = it.next();
                            arrarList.add(next.f1);
                        }
                        Collections.sort(arrarList);
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))
                                + "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());
                        out.collect(result);
                    }
                });
        //測試-把結果列印到控制檯即可
        window.print();

        //注意:因為flink是懶載入的,所以必須呼叫execute方法,上面的程式碼才會執行
        env.execute("eventtime-watermark");

    }



}

1.3:程式詳解

  1. 接收socket資料
  2. 將每行資料按照逗號分隔,每行資料呼叫map轉換成tuple<String,Long>型別。其中tuple中的第一個元素代表具體的資料,第二行代表資料的eventtime
  3. 抽取timestamp,生成watermar,允許的最大亂序時間是10s,並列印(key,eventtime,currentMaxTimestamp,watermark)等資訊
  4. 分組聚合,window視窗大小為3秒,輸出(key,視窗內元素個數,視窗內最早元素的時間,視窗內最晚元素的時間,視窗自身開始時間,視窗自身結束時間)

2:通過資料跟蹤watermark的時間

在這裡重點檢視watermark和timestamp的時間,通過資料的輸出來確定window的觸發時機

首先我們開啟socker,輸入第一條資料

輸出的內容如下:

為了檢視方便,我們把輸入內容彙總到表格中

Key

Event Time

CurrentMaxTimeStamp

WaterMark

0001

1538359882000

1538359882000

1538359872000

2018-10-01 10:11:22.000

2018-10-01 10:11:22.000

2018-10-01 10:11:12.000

此時,wartermark的時間,已經落後於currentMaxTimestamp10秒了。我們繼續輸入

此時,輸入內容如下:

我們再次彙總,如下表:

Key

Event Time

CurrentMaxTimeStamp

WaterMark

0001

1538359882000

1538359882000

1538359872000

2018-10-01 10:11:22.000

2018-10-01 10:11:22.000

2018-10-01 10:11:12.000

0001

1538359886000

1538359886000

1538359876000

2018-10-01 10:11:26.000

2018-10-01 10:11:26.000

2018-10-01 10:11:16.000

繼續輸入:

輸出內容如下:

彙總如下:

Key

Event Time

CurrentMaxTimeStamp

WaterMark

0001

1538359882000

1538359882000

1538359872000

2018-10-01 10:11:22.000

2018-10-01 10:11:22.000

2018-10-01 10:11:12.000

0001

1538359886000

1538359886000

1538359876000

2018-10-01 10:11:26.000

2018-10-01 10:11:26.000

2018-10-01 10:11:16.000

0001

1538359892000

1538359892000

1538359882000

2018-10-01 10:11:32.000

2018-10-01 10:11:32.000

2018-10-01 10:11:22.000

到這裡,window仍然沒有被觸發,此時watermark的時間已經等於了第一條資料的Event Time了。那麼window到底什麼時候被觸發呢?

我們再次輸入:

輸出內容如下:

彙總如下:  

Key

Event Time

CurrentMaxTimeStamp

WaterMark

0001

1538359882000

1538359882000

1538359872000

2018-10-01 10:11:22.000

2018-10-01 10:11:22.000

2018-10-01 10:11:12.000

0001

1538359886000

1538359886000

1538359876000

2018-10-01 10:11:26.000

2018-10-01 10:11:26.000

2018-10-01 10:11:16.000

0001

1538359892000

1538359892000

1538359882000

2018-10-01 10:11:32.000

2018-10-01 10:11:32.000

2018-10-01 10:11:22.000

0001

1538359893000

1538359893000

1538359883000

2018-10-01 10:11:33.000

2018-10-01 10:11:33.000

2018-10-01 10:11:23.000

window仍然沒有觸發,此時,我們的資料已經發到2018-10-01 10:11:33.000了,根據eventtime來算,最早的資料已經過去了11秒了,window還沒有開始計算,那到底什麼時候會觸發window呢?

我們再次增加1秒,輸入:

輸出:

彙總如下:

Key

Event Time

CurrentMaxTimeStamp

WaterMark

window_start_time

window_end_time

0001

1538359882000

1538359882000

1538359872000

2018-10-01 10:11:22.000

2018-10-01 10:11:22.000

2018-10-01 10:11:12.000

0001

1538359886000

1538359886000

1538359876000

2018-10-01 10:11:26.000

2018-10-01 10:11:26.000

2018-10-01 10:11:16.000

0001

1538359892000

1538359892000

1538359882000

2018-10-01 10:11:32.000

2018-10-01 10:11:32.000

2018-10-01 10:11:22.000

0001

1538359893000

1538359893000

1538359883000

2018-10-01 10:11:33.000

2018-10-01 10:11:33.000

2018-10-01 10:11:23.000

0001

1538359894000

1538359894000

1538359884000

2018-10-01 10:11:34.000

2018-10-01 10:11:34.000

2018-10-01 10:11:24.000

[10:11:21.000

10:11:24.000)

到這裡,我們做一個說明:  window的觸發機制,是先按照自然時間將window劃分,如果window大小是3秒,那麼1分鐘內會把window劃分為如下的形式【左閉右開】:

[00:00:00,00:00:03)
[00:00:03,00:00:06)
[00:00:06,00:00:09)
[00:00:09,00:00:12)
[00:00:12,00:00:15)
[00:00:15,00:00:18)
[00:00:18,00:00:21)
[00:00:21,00:00:24)
[00:00:24,00:00:27)
[00:00:27,00:00:30)
[00:00:30,00:00:33)
[00:00:33,00:00:36)
[00:00:36,00:00:39)
[00:00:39,00:00:42)
[00:00:42,00:00:45)
[00:00:45,00:00:48)
[00:00:48,00:00:51)
[00:00:51,00:00:54)
[00:00:54,00:00:57)
[00:00:57,00:01:00)
...

window的設定無關資料本身,而是系統定義好了的。

輸入的資料中,根據自身的Event Time,將資料劃分到不同的window中,如果window中有資料,則當watermark時間>=Event Time時,就符合了window觸發的條件了,最終決定window觸發,還是由資料本身的Event Time所屬的window中的window_end_time決定。

上面的測試中,最後一條資料到達後,其水位線已經升至10:11:24秒,正好是最早的一條記錄所在window的window_end_time,所以window就被觸發了。

為了驗證window的觸發機制,我們繼續輸入資料:

輸出:

彙總如下:

Key

Event Time

CurrentMaxTimeStamp

WaterMark

window_start_time

window_end_time

0001

1538359882000

1538359882000

1538359872000

2018-10-01 10:11:22.000

2018-10-01 10:11:22.000

2018-10-01 10:11:12.000

0001

1538359886000

1538359886000

1538359876000

2018-10-01 10:11:26.000

2018-10-01 10:11:26.000

2018-10-01 10:11:16.000

0001

1538359892000

1538359892000

1538359882000

2018-10-01 10:11:32.000

2018-10-01 10:11:32.000

2018-10-01 10:11:22.000

0001

1538359893000

1538359893000

1538359883000

2018-10-01 10:11:33.000

2018-10-01 10:11:33.000

2018-10-01 10:11:23.000

0001

1538359894000

1538359894000

1538359884000

2018-10-01 10:11:34.000

2018-10-01 10:11:34.000

2018-10-01 10:11:24.000

[10:11:21.000

10:11:24.000)

0001

1538359896000

1538359896000

1538359886000

2018-10-01 10:11:36.000

2018-10-01 10:11:36.000

2018-10-01 10:11:26.000

此時,watermark時間雖然已經達到了第二條資料的時間,但是由於其沒有達到第二條資料所在window的結束時間,所以window並沒有被觸發。那麼,第二條資料所在的window時間是:

[00:00:24,00:00:27)

也就是說,我們必須輸入一個10:11:27秒的資料,第二條資料所在的window才會被觸發。我們繼續輸入:

輸出:

彙總如下:

Key

Event Time

CurrentMaxTimeStamp

WaterMark

window_start_time

window_end_time

0001

1538359882000

1538359882000

1538359872000

2018-10-01 10:11:22.000

2018-10-01 10:11:22.000

2018-10-01 10:11:12.000

0001

1538359886000

1538359886000

1538359876000

2018-10-01 10:11:26.000

2018-10-01 10:11:26.000

2018-10-01 10:11:16.000

0001

1538359892000

1538359892000

1538359882000

2018-10-01 10:11:32.000

2018-10-01 10:11:32.000

2018-10-01 10:11:22.000

0001

1538359893000

1538359893000

1538359883000

2018-10-01 10:11:33.000

2018-10-01 10:11:33.000

2018-10-01 10:11:23.000

0001

1538359894000

1538359894000

1538359884000

2018-10-01 10:11:34.000

2018-10-01 10:11:34.000

2018-10-01 10:11:24.000

[10:11:21.000

10:11:24.000)

0001

1538359896000

1538359896000

1538359886000

2018-10-01 10:11:36.000

2018-10-01 10:11:36.000

2018-10-01 10:11:26.000

0001

1538359897000

1538359897000

1538359887000

2018-10-01 10:11:37.000

2018-10-01 10:11:37.000

2018-10-01 10:11:27.000

[10:11:24.000

10:11:27.000)

此時,我們已經看到,window的觸發要符合以下幾個條件:

1、watermark時間 >= window_end_time 
2、在[window_start_time,window_end_time)區間中有資料存在,注意是左閉右開的區間

同時滿足了以上2個條件,window才會觸發。

3:watermark+window處理亂序資料

我們上面的測試,資料都是按照時間順序遞增的,現在,我們輸入一些亂序的(late)資料,看看watermark結合window機制,是如何處理亂序的。

輸入兩行資料:

輸出:

彙總如下:

Key

Event Time

CurrentMaxTimeStamp

WaterMark

window_start_time

window_end_time

0001

1538359882000

1538359882000

1538359872000

2018-10-01 10:11:22.000

2018-10-01 10:11:22.000

2018-10-01 10:11:12.000

0001

1538359886000

1538359886000

1538359876000

2018-10-01 10:11:26.000

2018-10-01 10:11:26.000

2018-10-01 10:11:16.000

0001

1538359892000

1538359892000

1538359882000

2018-10-01 10:11:32.000

2018-10-01 10:11:32.000

2018-10-01 10:11:22.000

0001

1538359893000

1538359893000

1538359883000

2018-10-01 10:11:33.000

2018-10-01 10:11:33.000

2018-10-01 10:11:23.000

0001

1538359894000

1538359894000

1538359884000

2018-10-01 10:11:34.000

2018-10-01 10:11:34.000

2018-10-01 10:11:24.000

[10:11:21.000

10:11:24.000)

0001

1538359896000

1538359896000

1538359886000

2018-10-01 10:11:36.000

2018-10-01 10:11:36.000

2018-10-01 10:11:26.000

0001

1538359897000

1538359897000

1538359887000

2018-10-01 10:11:37.000

2018-10-01 10:11:37.000

2018-10-01 10:11:27.000

[10:11:24.000

10:11:27.000)

0001

1538359899000

1538359899000

1538359889000

2018-10-01 10:11:39.000

2018-10-01 10:11:39.000

2018-10-01 10:11:29.000

0001

1538359891000

1538359899000

1538359889000

2018-10-01 10:11:31.000

2018-10-01 10:11:39.000

2018-10-01 10:11:29.000

可以看到,雖然我們輸入了一個10:11:31的資料,但是currentMaxTimestamp和watermark都沒變。此時,按照我們上面提到的公式:

1、watermark時間 >= window_end_time
2、在[window_start_time,window_end_time)中有資料存在

watermark時間(10:11:29) < window_end_time(10:11:33),因此不能觸發window。

那如果我們再次輸入一條10:11:43的資料,此時watermark時間會升高到10:11:33,這時的window一定就會觸發了,我們試一試:  輸入: 

輸出:

彙總如下:

Key

Event Time

CurrentMaxTimeStamp

WaterMark

window_start_time

window_end_time

0001

1538359882000

1538359882000

1538359872000

2018-10-01 10:11:22.000

2018-10-01 10:11:22.000

2018-10-01 10:11:12.000

0001

1538359886000

1538359886000

1538359876000

2018-10-01 10:11:26.000

2018-10-01 10:11:26.000

2018-10-01 10:11:16.000

0001

1538359892000

1538359892000

1538359882000

2018-10-01 10:11:32.000

2018-10-01 10:11:32.000

2018-10-01 10:11:22.000

0001

1538359893000

1538359893000

1538359883000

2018-10-01 10:11:33.000

2018-10-01 10:11:33.000

2018-10-01 10:11:23.000

0001

1538359894000

1538359894000

1538359884000

2018-10-01 10:11:34.000

2018-10-01 10:11:34.000

2018-10-01 10:11:24.000

[10:11:21.000

10:11:24.000)

0001

1538359896000

1538359896000

1538359886000

2018-10-01 10:11:36.000

2018-10-01 10:11:36.000

2018-10-01 10:11:26.000

0001

1538359897000

1538359897000

1538359887000

2018-10-01 10:11:37.000

2018-10-01 10:11:37.000

2018-10-01 10:11:27.000

[10:11:24.000

10:11:27.000)

0001

1538359899000

1538359899000

1538359889000

2018-10-01 10:11:39.000

2018-10-01 10:11:39.000

2018-10-01 10:11:29.000

0001

1538359891000

1538359899000

1538359889000