Flink EventTime和Watermarks案例分析
目錄
針對資料亂序需求的案例分析,需要使用eventtime和watermark來解決
解釋:
watermarks的生成方式有兩種
1:With Periodic Watermarks:週期性的觸發watermark的生成和傳送
2:With Periodic Watermarks:基於某些事件觸發watermark的生成和傳送
第一種方式比較常用,所以在這裡我們使用第一種方式進行分析。
參考官網文件中With Periodic Watermarks的使用方法
程式碼中的extractTimestamp方法是從資料本身中提取EventTime
getCurrentWatermar方法是獲取當前水位線,利用currentMaxTimestamp - maxOutOfOrderness
這裡的maxOutOfOrderness表示是允許資料的最大亂序時間
所以在這裡我們使用的話也實現介面AssignerWithPeriodicWatermarks。
1:實現watermark相關程式碼
1.1:程式說明
從socket模擬接收資料,然後使用map進行處理,後面再呼叫assignTimestampsAndWatermarks方法抽取timestamp並生成watermark。最後再呼叫window列印資訊來驗證window被觸發的時機。
1.2:程式碼如下
完整程式碼如下:
package xuwei.tech.streaming.streamApiDemo; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import javax.annotation.Nullable; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; /** * * Watermark 案例 * * Created by xuwei.tech. */ public class StreamingWindowWatermark { public static void main(String[] args) throws Exception { //定義socket的埠號 int port = 9000; //獲取執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設定使用eventtime,預設是使用processtime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //設定並行度為1,預設並行度是當前機器的cpu數量 env.setParallelism(1); //連線socket獲取輸入的資料 DataStream<String> text = env.socketTextStream("hadoop100", port, "\n"); //解析輸入的資料 DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String value) throws Exception { String[] arr = value.split(","); return new Tuple2<>(arr[0], Long.parseLong(arr[1])); } }); //抽取timestamp和生成watermark DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() { Long currentMaxTimestamp = 0L; final Long maxOutOfOrderness = 10000L;// 最大允許的亂序時間是10s SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); /** * 定義生成watermark的邏輯 * 預設100ms被呼叫一次 */ @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } //定義如何提取timestamp @Override public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) { long timestamp = element.f1; currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); System.out.println("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]"); return timestamp; } }); //分組,聚合 DataStream<String> window = waterMarkStream.keyBy(0) .window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照訊息的EventTime分配視窗,和呼叫TimeWindow效果一樣 .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() { /** * 對window內的資料進行排序,保證資料的順序 * @param tuple * @param window * @param input * @param out * @throws Exception */ @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception { String key = tuple.toString(); List<Long> arrarList = new ArrayList<Long>(); Iterator<Tuple2<String, Long>> it = input.iterator(); while (it.hasNext()) { Tuple2<String, Long> next = it.next(); arrarList.add(next.f1); } Collections.sort(arrarList); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1)) + "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd()); out.collect(result); } }); //測試-把結果列印到控制檯即可 window.print(); //注意:因為flink是懶載入的,所以必須呼叫execute方法,上面的程式碼才會執行 env.execute("eventtime-watermark"); } }
1.3:程式詳解
- 接收socket資料
- 將每行資料按照逗號分隔,每行資料呼叫map轉換成tuple<String,Long>型別。其中tuple中的第一個元素代表具體的資料,第二行代表資料的eventtime
- 抽取timestamp,生成watermar,允許的最大亂序時間是10s,並列印(key,eventtime,currentMaxTimestamp,watermark)等資訊
- 分組聚合,window視窗大小為3秒,輸出(key,視窗內元素個數,視窗內最早元素的時間,視窗內最晚元素的時間,視窗自身開始時間,視窗自身結束時間)
2:通過資料跟蹤watermark的時間
在這裡重點檢視watermark和timestamp的時間,通過資料的輸出來確定window的觸發時機
首先我們開啟socker,輸入第一條資料
輸出的內容如下:
為了檢視方便,我們把輸入內容彙總到表格中
Key |
Event Time |
CurrentMaxTimeStamp |
WaterMark |
0001 |
1538359882000 |
1538359882000 |
1538359872000 |
2018-10-01 10:11:22.000 |
2018-10-01 10:11:22.000 |
2018-10-01 10:11:12.000 |
此時,wartermark的時間,已經落後於currentMaxTimestamp10秒了。我們繼續輸入
此時,輸入內容如下:
我們再次彙總,如下表:
Key |
Event Time |
CurrentMaxTimeStamp |
WaterMark |
0001 |
1538359882000 |
1538359882000 |
1538359872000 |
2018-10-01 10:11:22.000 |
2018-10-01 10:11:22.000 |
2018-10-01 10:11:12.000 |
|
0001 |
1538359886000 |
1538359886000 |
1538359876000 |
2018-10-01 10:11:26.000 |
2018-10-01 10:11:26.000 |
2018-10-01 10:11:16.000 |
繼續輸入:
輸出內容如下:
彙總如下:
Key |
Event Time |
CurrentMaxTimeStamp |
WaterMark |
0001 |
1538359882000 |
1538359882000 |
1538359872000 |
2018-10-01 10:11:22.000 |
2018-10-01 10:11:22.000 |
2018-10-01 10:11:12.000 |
|
0001 |
1538359886000 |
1538359886000 |
1538359876000 |
2018-10-01 10:11:26.000 |
2018-10-01 10:11:26.000 |
2018-10-01 10:11:16.000 |
|
0001 |
1538359892000 |
1538359892000 |
1538359882000 |
2018-10-01 10:11:32.000 |
2018-10-01 10:11:32.000 |
2018-10-01 10:11:22.000 |
到這裡,window仍然沒有被觸發,此時watermark的時間已經等於了第一條資料的Event Time了。那麼window到底什麼時候被觸發呢?
我們再次輸入:
輸出內容如下:
彙總如下:
Key |
Event Time |
CurrentMaxTimeStamp |
WaterMark |
0001 |
1538359882000 |
1538359882000 |
1538359872000 |
2018-10-01 10:11:22.000 |
2018-10-01 10:11:22.000 |
2018-10-01 10:11:12.000 |
|
0001 |
1538359886000 |
1538359886000 |
1538359876000 |
2018-10-01 10:11:26.000 |
2018-10-01 10:11:26.000 |
2018-10-01 10:11:16.000 |
|
0001 |
1538359892000 |
1538359892000 |
1538359882000 |
2018-10-01 10:11:32.000 |
2018-10-01 10:11:32.000 |
2018-10-01 10:11:22.000 |
|
0001 |
1538359893000 |
1538359893000 |
1538359883000 |
2018-10-01 10:11:33.000 |
2018-10-01 10:11:33.000 |
2018-10-01 10:11:23.000 |
window仍然沒有觸發,此時,我們的資料已經發到2018-10-01 10:11:33.000了,根據eventtime來算,最早的資料已經過去了11秒了,window還沒有開始計算,那到底什麼時候會觸發window呢?
我們再次增加1秒,輸入:
輸出:
彙總如下:
Key |
Event Time |
CurrentMaxTimeStamp |
WaterMark |
window_start_time |
window_end_time |
0001 |
1538359882000 |
1538359882000 |
1538359872000 |
||
2018-10-01 10:11:22.000 |
2018-10-01 10:11:22.000 |
2018-10-01 10:11:12.000 |
|||
0001 |
1538359886000 |
1538359886000 |
1538359876000 |
||
2018-10-01 10:11:26.000 |
2018-10-01 10:11:26.000 |
2018-10-01 10:11:16.000 |
|||
0001 |
1538359892000 |
1538359892000 |
1538359882000 |
||
2018-10-01 10:11:32.000 |
2018-10-01 10:11:32.000 |
2018-10-01 10:11:22.000 |
|||
0001 |
1538359893000 |
1538359893000 |
1538359883000 |
||
2018-10-01 10:11:33.000 |
2018-10-01 10:11:33.000 |
2018-10-01 10:11:23.000 |
|||
0001 |
1538359894000 |
1538359894000 |
1538359884000 |
||
2018-10-01 10:11:34.000 |
2018-10-01 10:11:34.000 |
2018-10-01 10:11:24.000 |
[10:11:21.000 |
10:11:24.000) |
到這裡,我們做一個說明: window的觸發機制,是先按照自然時間將window劃分,如果window大小是3秒,那麼1分鐘內會把window劃分為如下的形式【左閉右開】:
[00:00:00,00:00:03)
[00:00:03,00:00:06)
[00:00:06,00:00:09)
[00:00:09,00:00:12)
[00:00:12,00:00:15)
[00:00:15,00:00:18)
[00:00:18,00:00:21)
[00:00:21,00:00:24)
[00:00:24,00:00:27)
[00:00:27,00:00:30)
[00:00:30,00:00:33)
[00:00:33,00:00:36)
[00:00:36,00:00:39)
[00:00:39,00:00:42)
[00:00:42,00:00:45)
[00:00:45,00:00:48)
[00:00:48,00:00:51)
[00:00:51,00:00:54)
[00:00:54,00:00:57)
[00:00:57,00:01:00)
...
window的設定無關資料本身,而是系統定義好了的。
輸入的資料中,根據自身的Event Time,將資料劃分到不同的window中,如果window中有資料,則當watermark時間>=Event Time時,就符合了window觸發的條件了,最終決定window觸發,還是由資料本身的Event Time所屬的window中的window_end_time決定。
上面的測試中,最後一條資料到達後,其水位線已經升至10:11:24秒,正好是最早的一條記錄所在window的window_end_time,所以window就被觸發了。
為了驗證window的觸發機制,我們繼續輸入資料:
輸出:
彙總如下:
Key |
Event Time |
CurrentMaxTimeStamp |
WaterMark |
window_start_time |
window_end_time |
0001 |
1538359882000 |
1538359882000 |
1538359872000 |
||
2018-10-01 10:11:22.000 |
2018-10-01 10:11:22.000 |
2018-10-01 10:11:12.000 |
|||
0001 |
1538359886000 |
1538359886000 |
1538359876000 |
||
2018-10-01 10:11:26.000 |
2018-10-01 10:11:26.000 |
2018-10-01 10:11:16.000 |
|||
0001 |
1538359892000 |
1538359892000 |
1538359882000 |
||
2018-10-01 10:11:32.000 |
2018-10-01 10:11:32.000 |
2018-10-01 10:11:22.000 |
|||
0001 |
1538359893000 |
1538359893000 |
1538359883000 |
||
2018-10-01 10:11:33.000 |
2018-10-01 10:11:33.000 |
2018-10-01 10:11:23.000 |
|||
0001 |
1538359894000 |
1538359894000 |
1538359884000 |
||
2018-10-01 10:11:34.000 |
2018-10-01 10:11:34.000 |
2018-10-01 10:11:24.000 |
[10:11:21.000 |
10:11:24.000) |
|
0001 |
1538359896000 |
1538359896000 |
1538359886000 |
||
2018-10-01 10:11:36.000 |
2018-10-01 10:11:36.000 |
2018-10-01 10:11:26.000 |
此時,watermark時間雖然已經達到了第二條資料的時間,但是由於其沒有達到第二條資料所在window的結束時間,所以window並沒有被觸發。那麼,第二條資料所在的window時間是:
[00:00:24,00:00:27)
也就是說,我們必須輸入一個10:11:27秒的資料,第二條資料所在的window才會被觸發。我們繼續輸入:
輸出:
彙總如下:
Key |
Event Time |
CurrentMaxTimeStamp |
WaterMark |
window_start_time |
window_end_time |
0001 |
1538359882000 |
1538359882000 |
1538359872000 |
||
2018-10-01 10:11:22.000 |
2018-10-01 10:11:22.000 |
2018-10-01 10:11:12.000 |
|||
0001 |
1538359886000 |
1538359886000 |
1538359876000 |
||
2018-10-01 10:11:26.000 |
2018-10-01 10:11:26.000 |
2018-10-01 10:11:16.000 |
|||
0001 |
1538359892000 |
1538359892000 |
1538359882000 |
||
2018-10-01 10:11:32.000 |
2018-10-01 10:11:32.000 |
2018-10-01 10:11:22.000 |
|||
0001 |
1538359893000 |
1538359893000 |
1538359883000 |
||
2018-10-01 10:11:33.000 |
2018-10-01 10:11:33.000 |
2018-10-01 10:11:23.000 |
|||
0001 |
1538359894000 |
1538359894000 |
1538359884000 |
||
2018-10-01 10:11:34.000 |
2018-10-01 10:11:34.000 |
2018-10-01 10:11:24.000 |
[10:11:21.000 |
10:11:24.000) |
|
0001 |
1538359896000 |
1538359896000 |
1538359886000 |
||
2018-10-01 10:11:36.000 |
2018-10-01 10:11:36.000 |
2018-10-01 10:11:26.000 |
|||
0001 |
1538359897000 |
1538359897000 |
1538359887000 |
||
2018-10-01 10:11:37.000 |
2018-10-01 10:11:37.000 |
2018-10-01 10:11:27.000 |
[10:11:24.000 |
10:11:27.000) |
此時,我們已經看到,window的觸發要符合以下幾個條件:
1、watermark時間 >= window_end_time
2、在[window_start_time,window_end_time)區間中有資料存在,注意是左閉右開的區間
同時滿足了以上2個條件,window才會觸發。
3:watermark+window處理亂序資料
我們上面的測試,資料都是按照時間順序遞增的,現在,我們輸入一些亂序的(late)資料,看看watermark結合window機制,是如何處理亂序的。
輸入兩行資料:
輸出:
彙總如下:
Key |
Event Time |
CurrentMaxTimeStamp |
WaterMark |
window_start_time |
window_end_time |
0001 |
1538359882000 |
1538359882000 |
1538359872000 |
||
2018-10-01 10:11:22.000 |
2018-10-01 10:11:22.000 |
2018-10-01 10:11:12.000 |
|||
0001 |
1538359886000 |
1538359886000 |
1538359876000 |
||
2018-10-01 10:11:26.000 |
2018-10-01 10:11:26.000 |
2018-10-01 10:11:16.000 |
|||
0001 |
1538359892000 |
1538359892000 |
1538359882000 |
||
2018-10-01 10:11:32.000 |
2018-10-01 10:11:32.000 |
2018-10-01 10:11:22.000 |
|||
0001 |
1538359893000 |
1538359893000 |
1538359883000 |
||
2018-10-01 10:11:33.000 |
2018-10-01 10:11:33.000 |
2018-10-01 10:11:23.000 |
|||
0001 |
1538359894000 |
1538359894000 |
1538359884000 |
||
2018-10-01 10:11:34.000 |
2018-10-01 10:11:34.000 |
2018-10-01 10:11:24.000 |
[10:11:21.000 |
10:11:24.000) |
|
0001 |
1538359896000 |
1538359896000 |
1538359886000 |
||
2018-10-01 10:11:36.000 |
2018-10-01 10:11:36.000 |
2018-10-01 10:11:26.000 |
|||
0001 |
1538359897000 |
1538359897000 |
1538359887000 |
||
2018-10-01 10:11:37.000 |
2018-10-01 10:11:37.000 |
2018-10-01 10:11:27.000 |
[10:11:24.000 |
10:11:27.000) |
|
0001 |
1538359899000 |
1538359899000 |
1538359889000 |
||
2018-10-01 10:11:39.000 |
2018-10-01 10:11:39.000 |
2018-10-01 10:11:29.000 |
|||
0001 |
1538359891000 |
1538359899000 |
1538359889000 |
||
2018-10-01 10:11:31.000 |
2018-10-01 10:11:39.000 |
2018-10-01 10:11:29.000 |
可以看到,雖然我們輸入了一個10:11:31的資料,但是currentMaxTimestamp和watermark都沒變。此時,按照我們上面提到的公式:
1、watermark時間 >= window_end_time
2、在[window_start_time,window_end_time)中有資料存在
watermark時間(10:11:29) < window_end_time(10:11:33),因此不能觸發window。
那如果我們再次輸入一條10:11:43的資料,此時watermark時間會升高到10:11:33,這時的window一定就會觸發了,我們試一試: 輸入:
輸出:
彙總如下:
Key |
Event Time |
CurrentMaxTimeStamp |
WaterMark |
window_start_time |
window_end_time |
0001 |
1538359882000 |
1538359882000 |
1538359872000 |
||
2018-10-01 10:11:22.000 |
2018-10-01 10:11:22.000 |
2018-10-01 10:11:12.000 |
|||
0001 |
1538359886000 |
1538359886000 |
1538359876000 |
||
2018-10-01 10:11:26.000 |
2018-10-01 10:11:26.000 |
2018-10-01 10:11:16.000 |
|||
0001 |
1538359892000 |
1538359892000 |
1538359882000 |
||
2018-10-01 10:11:32.000 |
2018-10-01 10:11:32.000 |
2018-10-01 10:11:22.000 |
|||
0001 |
1538359893000 |
1538359893000 |
1538359883000 |
||
2018-10-01 10:11:33.000 |
2018-10-01 10:11:33.000 |
2018-10-01 10:11:23.000 |
|||
0001 |
1538359894000 |
1538359894000 |
1538359884000 |
||
2018-10-01 10:11:34.000 |
2018-10-01 10:11:34.000 |
2018-10-01 10:11:24.000 |
[10:11:21.000 |
10:11:24.000) |
|
0001 |
1538359896000 |
1538359896000 |
1538359886000 |
||
2018-10-01 10:11:36.000 |
2018-10-01 10:11:36.000 |
2018-10-01 10:11:26.000 |
|||
0001 |
1538359897000 |
1538359897000 |
1538359887000 |
||
2018-10-01 10:11:37.000 |
2018-10-01 10:11:37.000 |
2018-10-01 10:11:27.000 |
[10:11:24.000 |
10:11:27.000) |
|
0001 |
1538359899000 |
1538359899000 |
1538359889000 |
||
2018-10-01 10:11:39.000 |
2018-10-01 10:11:39.000 |
2018-10-01 10:11:29.000 |
|||
0001 |
1538359891000 |
1538359899000 |
1538359889000 |
||
|