1. 程式人生 > >Flink系列之Time和WaterMark

Flink系列之Time和WaterMark

  當資料進入Flink的時候,資料需要帶入相應的時間,根據相應的時間進行處理。

  讓咱們想象一個場景,有一個佇列,分別帶著指定的時間,那麼處理的時候,需要根據相應的時間進行處理,比如:統計最近五分鐘的訪問量,那麼就需要知道資料到來的時間。五分鐘以內的資料將會被計入,超過五分鐘的將會計入下一個計算視窗。

  那麼Flink的Time分為三種:

  ProcessingTime : 處理時間,即運算元處理資料的機器產生的時間,資料的流程一般是Source -> Transform (Operator,即運算元) -> Sink(儲存資料)。ProcessingTime出現在Transform,運算元計算的點。這個比較好生成,資料一旦到了運算元就會生成。如果在分散式系統中或非同步系統中,對於一個訊息系統,有的機器處理的塊,有的機器消費的慢,運算元和運算元之間的處理速度,還有可能有些機器出故障了,ProcessingTime將不能根據時間產生決定資料的結果,因為什麼時候到了運算元才會生成這個時間。

  EventTime : 事件時間,此事件一般是產生資料的源頭生成的。帶著event time的事件進入flink的source之後就可以把事件事件進行提取,提取出來之後可以根據這個時間處理需要一致性和決定性的結果。比如,計算一個小時或者五分鐘內的資料訪問量。當然資料的EventTime可以是有序的,也可以是無序的。有序的資料大家比較好理解,比如,第一秒到第一條,第二秒到第二條資料。無序的資料,舉個例子要計算五秒的資料,假如現在為10:00:00, 那麼資料EventTime在[10:00:00 10:00:05), [10:00:05 10:00:10),加入一條資料是04秒產生的,那麼由於機器處理的慢,該資料在08秒的時候到了,這個時候我們理解該資料就是無序的。可以通過WaterMark的機制處理無序的資料,一會兒咱們在文章中繼續解釋。

  IngestionTime : 攝入時間,即資料進入Flink的Source的時候計入的時間。相對於以上兩個時間,IngestionTime 介於 ProcessingTime 和 EventTime之間,相比於ProcessingTime,生成的更加方便快捷,ProcessingTime每次進入一個Operator(運算元,即map、flatMap、reduce等)都會產生一個時間,而IngestionTime在進入Flink的時候就產生了timestamp。相比於eventTime,它不能處理無序的事件,因為每次進入source產生的時間都是有序的,IngestionTime也無須產生WaterMark,因為會自動生成。

  如果大家還不是特別理解的話,咱們從官網拿一張圖來展示,這個會比較一目瞭然。

 

 

 

   Event Producer 產生資料,這個時候就帶上EventTime了,這個時候比如使用者訪問的記錄,訪問的時間就是EventTime。然後放入了MessageQueue-訊息佇列,進入到Flink Source的時候可以生成IngetionTime,也就是被Flink "吞" 進入時的時間,可以這麼理解一下。然後由Source再進入到Operator-運算元,也就是將資料進行轉換,如Map, FlatMap等操作,這個時候每進入一個Operator都會生成一個時間即ProcessingTime。

  IngestionTime和ProcessingTime都是生成的,所以時間是升序的,裡邊的時間戳timestamp和水位線Watermark都是自動生成的,所以不用考慮這個。而EventTime與其他兩個有些差異,它可以是升序的,也可以不是無序的。

  假如一個訊息佇列來了帶著事件時間,時間為: 1, 2,3,4, 5。 這個加入是資料過來的時間順序,如果需要統計2秒之間的資料的話,那麼就會產生的視窗資料為[1,2], [3,4] [5],這個有序時間。

  多數情況下,從訊息佇列過來的資料一般時間一般沒有順序。比如過來的資料事件時間為 1,3,2,4,5,那麼我們想要正確2秒的資料,我們就需要引入Watermark, 水位線一說,這個水位線的含義就是當資料達到了這個水位線的時候才觸發真正的資料統計,對於視窗來講,也就是將視窗進行關閉然後進行統計。假如我們允許最大的延遲時間為1秒,那麼這些資料將會分成:

  1, 3, 2 | 水位線 |  4,5 | 水位線 |

  1 -> 分到1-2的視窗中。

  3 -> 新建立一個視窗(3-4),然後分到3-4的視窗中。

  2 -> 分到1-2的視窗看。

  水位線 -> 進行視窗統計和資料彙總。

  4 -> 分到3-4的視窗。

  5 -> 新建一個視窗(5-6),然後分配到新視窗中。

  不知道這樣寫大家能不能理解呢,如果覺得有問題的話可以給我留言。

  上面的這樣是延遲資料的處理機制,當然還有並行流處理的情況,這種情況下有的資料慢,有的資料快,那麼eventTime小的資料會先流到下一個運算元上,下面事件時間14和29在到window的時候,那麼14會先流到window進行處理,

  在Source之後會產生對應的watermark,不同的source接入不同的資料將會分別產生對應的watermark,當然watermark也需要遵循著從小到大進行觸發,保證資料的正確處理。

 

 

 

 

 

 

 

 

 

 

 

 

 

  Watermark的設定:

  一種是Punctuated Watermark, 翻譯過來應該是“間斷的水位線”,咱們來看下原文

  To generate watermarks whenever a certain event indicates that a new watermark might be generated, use AssignerWithPunctuatedWatermarks. For this class Flink will first call the extractTimestamp(...) method to assign the element a timestamp, and then immediately call the checkAndGetNextWatermark(...) method on that element.

  如果資料是間斷性的,那麼可以使用這個作為產生watermark的方式。如果一直有資料且EventTime是遞增的話,那麼每條資料就會產生一條資料,這種情況下會對系統造成負載,所以連續產生資料的情況下使用這種不合適。這個方法首先呼叫的是extractTimestamp用於抽取時間戳,checkAndGetNextWatermark用於檢查和生成下一個水位線。

  

  第二種是Periodic Watermark,翻譯過來是“週期性水位線”,看下原文

  AssignerWithPeriodicWatermarks assigns timestamps and generates watermarks periodically (possibly depending on the stream elements, or purely based on processing time).

  週期性的獲取timestamp和生成watermark。可以依賴流元素的時間,比如EventTime或者ProcessingTime。這個介面先呼叫extractTimestamp方法獲取timestamp,接著呼叫getCurrentWatermark生成相應的時間戳。

  

  這種週期性水位線有如下三種實現:

  1)AscendingTimestampExtractor,如果資料產生的時間是升序的,可以使用這個實現獲取timestamp和生成watermark。這種情況下,如果有資料升序中有小於當前時間戳的事件時,比如1,2,3,2,4,在這種情況下資料2將會丟失。丟失的資料可以通過sideOutputLateData獲取到。

  2)BoundedOutOfOrdernessTimestampExtractor,如果資料是無需的,可以使用這個實現,指定相應的延遲時間。

  3)IngestionTimeExtractor, 這個是當指定時間特性為IngestionTime時,直接生成時間戳和獲取水印。

  

  下面寫一個例子,進一步加深理解。以下是通過建立一個socket服務端,通過資料資料進行資料展示,資料分為word和時間戳來演示,首先指定時間特性為EventTime,預設的時間特性為ProcessingTime。將單詞和時間戳進行解析拆分進行FlatMap進行資料解析成WordCount類,分配時間戳和生成水印,按word欄位進行拆分,統計5秒鐘的滾動視窗資料做reduce,最後是列印和輸出。

package com.hqs.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;

/**
 * @author huangqingshi
 * @Date 2020-01-11
 */
public class SocketEventTime {


    public static void main(String[] args) throws Exception{
        //建立env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設定流的時間特性,
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //設定並行度
        env.setParallelism(1);
        //設定監聽localhost:9000埠,以回車分割
        DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");

        DataStream<SocketWindowCount.WordCount> wordCountStream = text.
                flatMap(new FlatMapFunction<String, SocketWindowCount.WordCount>() {
                    @Override
                    public void flatMap(String value, Collector<SocketWindowCount.WordCount> out) throws Exception {
                        String[] args = value.split(",");
                        out.collect(SocketWindowCount.WordCount.of(args[0], args[1]));
                    }
                }).


                assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SocketWindowCount.WordCount>() {



                    long currentTimeStamp = 0L;
                    //允許的最大延遲時間,單位為毫秒
                    long maxDelayAllowed = 0L;
                    long currentWaterMark;


                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        currentWaterMark = currentTimeStamp - maxDelayAllowed;
//                        System.out.println("當前waterMark:" + currentWaterMark);
                        return new Watermark(currentWaterMark);
                    }

                    @Override
                    public long extractTimestamp(SocketWindowCount.WordCount wordCount, long l) {

                        long timeStamp = Long.parseLong(wordCount.timestamp);
                        currentTimeStamp = Math.max(currentTimeStamp, timeStamp);

                        System.out.println("Key:" + wordCount.word + ",EventTime:" + timeStamp + ",前一條資料的水位線:" + currentWaterMark
                                + ",當前水位線:" + (currentTimeStamp - maxDelayAllowed));
                        return timeStamp;
                    }


                });

        DataStream<SocketWindowCount.WordCount> windowsCounts = wordCountStream.
                keyBy("word").
        window(TumblingEventTimeWindows.of(Time.seconds(5))).


                        reduce(new ReduceFunction<SocketWindowCount.WordCount>() {
                            @Override
                            public SocketWindowCount.WordCount reduce(SocketWindowCount.WordCount wordCount, SocketWindowCount.WordCount t1) throws Exception {

//                                System.out.println("reduce:" + wordCount.timestamp + "," + t1.timestamp);
                                t1.timestamp = wordCount.timestamp + "," + t1.timestamp;
                                return t1;
                            }
                        });

        //將結果集進行列印
        windowsCounts.print();

        //提交所設定的執行
        env.execute("EventTime Example");

    }


    public static class WordCount {

        public String word;
        public String timestamp;

        public static SocketWindowCount.WordCount of(String word, String timestamp) {
            SocketWindowCount.WordCount wordCount = new SocketWindowCount.WordCount();
            wordCount.word = word;
            wordCount.timestamp = timestamp;
            return wordCount;
        }

        @Override
        public String toString() {
            return "word:" + word + " timestamp:" + timestamp;
        }
    }


}

  使用nc命令建立一個socket連線並且輸入資料,前邊為單詞,後邊為timestamp時間戳,大家可以轉換為時間:      

huangqingshideMacBook-Pro:~ huangqingshi$ nc -lk 9000
hello,1553503185000
hello,1553503186000
hello,1553503187000
hello,1553503188000
hello,1553503189000
hello,1553503190000
hello,1553503187000
hello,1553503191000
hello,1553503192000
hello,1553503193000
hello,1553503194000
hello,1553503195000

  輸出的結果如下,從上邊我們看到最大延遲時間maxDelayAllowed為0秒,也就意味著採用升序的獲取,等於使用AscendingTimestampExtractor,每來一條資料即生成一個時間戳和水位。因為中間有一條資料為155350318700,小於上邊的資料,所以這條資料丟失了。當5秒的時候觸發一個window時間,即資料的結果輸出。

Key:hello,EventTime:1553503185000,前一條資料的水位線:0,當前水位線:1553503185000
Key:hello,EventTime:1553503186000,前一條資料的水位線:1553503185000,當前水位線:1553503186000
Key:hello,EventTime:1553503187000,前一條資料的水位線:1553503186000,當前水位線:1553503187000
Key:hello,EventTime:1553503188000,前一條資料的水位線:1553503187000,當前水位線:1553503188000
Key:hello,EventTime:1553503189000,前一條資料的水位線:1553503188000,當前水位線:1553503189000
Key:hello,EventTime:1553503190000,前一條資料的水位線:1553503189000,當前水位線:1553503190000
word:hello timestamp:1553503185000,1553503186000,1553503187000,1553503188000,1553503189000
Key:hello,EventTime:1553503187000,前一條資料的水位線:1553503190000,當前水位線:1553503190000
Key:hello,EventTime:1553503191000,前一條資料的水位線:1553503190000,當前水位線:1553503191000
Key:hello,EventTime:1553503192000,前一條資料的水位線:1553503191000,當前水位線:1553503192000
Key:hello,EventTime:1553503193000,前一條資料的水位線:1553503192000,當前水位線:1553503193000
Key:hello,EventTime:1553503194000,前一條資料的水位線:1553503193000,當前水位線:1553503194000
Key:hello,EventTime:1553503195000,前一條資料的水位線:1553503194000,當前水位線:1553503195000
word:hello timestamp:1553503190000,1553503191000,1553503192000,1553503193000,1553503194000

  下面咱們調整下最大延遲時間程式碼:

//允許的最大延遲時間,單位為毫秒
long maxDelayAllowed = 5000L;

  咱們來看下輸出的結果,這次資料有了上邊丟失的資料了。

Key:hello,EventTime:1553503185000,前一條資料的水位線:-5000,當前水位線:1553503180000
Key:hello,EventTime:1553503186000,前一條資料的水位線:1553503180000,當前水位線:1553503181000
Key:hello,EventTime:1553503187000,前一條資料的水位線:1553503181000,當前水位線:1553503182000
Key:hello,EventTime:1553503188000,前一條資料的水位線:1553503182000,當前水位線:1553503183000
Key:hello,EventTime:1553503189000,前一條資料的水位線:1553503183000,當前水位線:1553503184000
Key:hello,EventTime:1553503190000,前一條資料的水位線:1553503184000,當前水位線:1553503185000
Key:hello,EventTime:1553503187000,前一條資料的水位線:1553503185000,當前水位線:1553503185000
Key:hello,EventTime:1553503191000,前一條資料的水位線:1553503185000,當前水位線:1553503186000
Key:hello,EventTime:1553503191000,前一條資料的水位線:1553503186000,當前水位線:1553503186000
Key:hello,EventTime:1553503192000,前一條資料的水位線:1553503186000,當前水位線:1553503187000
Key:hello,EventTime:1553503193000,前一條資料的水位線:1553503187000,當前水位線:1553503188000
Key:hello,EventTime:1553503194000,前一條資料的水位線:1553503188000,當前水位線:1553503189000
Key:hello,EventTime:1553503195000,前一條資料的水位線:1553503189000,當前水位線:1553503190000
word:hello timestamp:1553503185000,1553503186000,1553503187000,1553503188000,1553503189000,1553503187000

  下面咱們來分析下上面的結果,第一條資料的時間為45秒整,上邊的資料基本上是連續的,只有一條資料 1553503187000為47秒的時候出現了亂序中。再來回憶一下上邊的程式碼,上邊的資料延遲為5秒,統計的資料為5秒的滾動視窗的資料,將時間戳合起來。

  那麼第一個彙總的視窗為[2019-03-25 16:39:45 2019-03-25 16:39:50),那麼資料在什麼時間觸發視窗呢,也就是在輸入1553503195000的時候進行的視窗彙總, 這條資料的時間為2019-03-25 16:39:55,水位線為2019-03-25 16:39:50,由此我們得出結論:

  當統計時間window視窗中有資料的時候,watermark時間 >= 視窗的結束時間時進行觸發。

  如果想使用IngestionTime設定為時間特性的話,只需要更改幾行程式碼即可。  

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<SocketWindowCount.WordCount> wordCountStream = text.
                flatMap(new FlatMapFunction<String, SocketWindowCount.WordCount>() {
                    @Override
                    public void flatMap(String value, Collector<SocketWindowCount.WordCount> out) throws Exception {
                        String[] args = value.split(",");
                        out.collect(SocketWindowCount.WordCount.of(args[0], args[1]));
                    }
                }).


                assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

 DataStream<SocketWindowCount.WordCount> windowsCounts = wordCountStream.
                keyBy("word").
                timeWindow(Time.seconds(5L)).


                reduce(new ReduceFunction<SocketWindowCount.WordCount>() {
                    @Override
                    public SocketWindowCount.WordCount reduce(SocketWindowCount.WordCount wordCount, SocketWindowCount.WordCount t1) throws Exception {

//                                System.out.println("reduce:" + wordCount.timestamp + "," + t1.timestamp);
                        t1.timestamp = wordCount.timestamp + "," + t1.timestamp;
                        return t1;
                    }
                });

  如果要使用ProcessingTime,同理把時間特性改一下即可。完整的程式碼如下,紅色的程式碼為改變的程式碼。

package com.hqs.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.sql.Timestamp;

/**
 * @author huangqingshi
 * @Date 2020-01-11
 */
public class SocketIngestionTime {

    public static void main(String[] args) throws Exception {
        //建立env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設定流的時間特性,
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        //設定並行度
        env.setParallelism(1);
        //設定監聽localhost:9000埠,以回車分割
        DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");

        DataStream<SocketWindowCount.WordCount> wordCountStream = text.
                flatMap(new FlatMapFunction<String, SocketWindowCount.WordCount>() {
                    @Override
                    public void flatMap(String value, Collector<SocketWindowCount.WordCount> out) throws Exception {
                        String[] args = value.split(",");
                        out.collect(SocketWindowCount.WordCount.of(args[0], args[1]));
                    }
                });

        DataStream<SocketWindowCount.WordCount> windowsCounts = wordCountStream.
                keyBy("word").
                window(TumblingProcessingTimeWindows.of(Time.seconds(5))).


                reduce(new ReduceFunction<SocketWindowCount.WordCount>() {
                    @Override
                    public SocketWindowCount.WordCount reduce(SocketWindowCount.WordCount wordCount, SocketWindowCount.WordCount t1) throws Exception {

//                                System.out.println("reduce:" + wordCount.timestamp + "," + t1.timestamp);
                        t1.timestamp = wordCount.timestamp + "," + t1.timestamp;
                        return t1;
                    }
                });

        //將結果集進行列印
        windowsCounts.print();

        //提交所設定的執行
        env.execute("EventTime Example");

    }


    public static class WordCount {

        public String word;
        public String timestamp;

        public static SocketWindowCount.WordCount of(String word, String timestamp) {
            SocketWindowCount.WordCount wordCount = new SocketWindowCount.WordCount();
            wordCount.word = word;
            wordCount.timestamp = timestamp;
            return wordCount;
        }

        @Override
        public String toString() {
            return "word:" + word + " timestamp:" + timestamp;
        }
    }
}

  好了,如果有什麼問題,可以留言或加我微信與我聯絡。