1. 程式人生 > 其它 >Flink-Window

Flink-Window

Flink的高階API

Flink的基石

Flink之所以能這麼流行,離不開它最重要的四個基石:Checkpoint、State、Time、Window。

Checkpoint

這是Flink最重要的一個特性。

Flink基於Chandy-Lamport演算法實現了一個分散式的一致性的快照,從而提供了一致性的語義。

Chandy-Lamport演算法實際上在1985年的時候已經被提出來,但並沒有被很廣泛的應用,而Flink則把這個演算法發揚光大了。

Spark最近在實現Continue streaming,Continue streaming的目的是為了降低處理的延時,其也需要提供這種一致性的語義,最終也採用了Chandy-Lamport這個演算法,說明Chandy-Lamport演算法在業界得到了一定的肯定。

State

提供了一致性的語義之後,Flink為了讓使用者在程式設計時能夠更輕鬆、更容易地去管理狀態,還提供了一套非常簡單明瞭的State API,包括ValueState、ListState、MapState,BroadcastState。

Time

除此之外,Flink還實現了Watermark的機制,能夠支援基於事件的時間的處理,能夠容忍遲到/亂序的資料。

Window

另外流計算中一般在對流資料進行操作之前都會先進行開窗,即基於一個什麼樣的視窗上做這個計算。Flink提供了開箱即用的各種視窗,比如滑動視窗、滾動視窗、會話視窗以及非常靈活的自定義的視窗。

為什麼需要Window

在流處理應用中,資料是連續不斷的,有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內有多少使用者點選了我們的網頁。
在這種情況下,我們必須定義一個視窗(window),用來收集最近1分鐘內的資料,並對這個視窗內的資料進行計算。

Window的分類

按照time和count分類

  • time-window:時間視窗:根據時間劃分視窗,如:每xx分鐘統計最近xx分鐘的資料
  • count-window:數量視窗:根據數量劃分視窗,如:每xx個數據統計最近xx個數據

按照slide和size分類

視窗有兩個重要的屬性: 視窗大小size和滑動間隔slide,根據它們的大小關係可分為:

  • tumbling-window:滾動視窗:size=slide,如:每隔10s統計最近10s的資料
  • sliding-window:滑動視窗:size>slide,如:每隔5s統計最近10s的資料

總結:

按照上面視窗的分類方式進行組合,可以得出如下的視窗:

1.基於時間的滾動視窗tumbling-time-window--用的較多

2.基於時間的滑動視窗sliding-time-window--用的較多

3.基於數量的滾動視窗tumbling-count-window--用的較少

4.基於數量的滑動視窗sliding-count-window--用的較少

注意:Flink還支援一個特殊的視窗:Session會話視窗,需要設定一個會話超時時間,如30s,則表示30s內沒有資料到來,則觸發上個視窗的計算

Window的API

window和windowAll

使用keyby的流,應該使用window方法

未使用keyby的流,應該呼叫windowAll方法

WindowAssigner

window/windowAll 方法接收的輸入是一個 WindowAssigner, WindowAssigner 負責將每條輸入的資料分發到正確的 window 中,

Flink提供了很多各種場景用的WindowAssigner:

如果需要自己定製資料分發策略,則可以實現一個 class,繼承自 WindowAssigner。

evictor

evictor 主要用於做一些資料的自定義操作,可以在執行使用者程式碼之前,也可以在執行

使用者程式碼之後,更詳細的描述可以參考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter兩個方法。

Flink 提供瞭如下三種通用的 evictor:

  • CountEvictor 保留指定數量的元素

  • TimeEvictor 設定一個閾值 interval,刪除所有不再 max_ts - interval 範圍內的元素,其中 max_ts 是視窗內時間戳的最大值。

  • DeltaEvictor 通過執行使用者給定的 DeltaFunction 以及預設的 theshold,判斷是否刪除一個元素。

trigger

trigger 用來判斷一個視窗是否需要被觸發,每個 WindowAssigner 都自帶一個預設的trigger,

如果預設的 trigger 不能滿足你的需求,則可以自定義一個類,繼承自Trigger 即可,我們詳細描述下 Trigger 的介面以及含義:

  • onElement() 每次往 window 增加一個元素的時候都會觸發

  • onEventTime() 當 event-time timer 被觸發的時候會呼叫

  • onProcessingTime() 當 processing-time timer 被觸發的時候會呼叫

  • onMerge() 對兩個 `rigger 的 state 進行 merge 操作

  • clear() window 銷燬的時候被呼叫

上面的介面中前三個會返回一個 TriggerResult, TriggerResult 有如下幾種可能的選擇:

  • CONTINUE 不做任何事情

  • FIRE 觸發 window

  • PURGE 清空整個 window 的元素並銷燬視窗

  • FIRE_AND_PURGE 觸發視窗,然後銷燬視窗

基於時間的滾動和滑動視窗

時間的滾動視窗

/**
 * @author WGR
 * @create 2021/9/10 -- 9:24
 */
public class WindowDemo01_TimeWindow {

    public static void main(String[] args) throws Exception {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketDS  = env.socketTextStream("192.168.1.180", 9998);

        SingleOutputStreamOperator<CartInfo> cartInfoDS  = socketDS.map(new MapFunction<String, CartInfo>() {

            @Override
            public CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });

        // 需求1:每5秒鐘統計一次,最近5秒鐘內,各個路口/訊號燈通過紅綠燈汽車的數量--基於時間的滾動視窗
        SingleOutputStreamOperator<CartInfo> result1  = cartInfoDS.keyBy(CartInfo::getSensorId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum("count");

        result1.print();
        env.execute();

    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//訊號燈id
        private Integer count;//通過該訊號燈的車的數量
    }
}

結果:

7> WindowDemo01_TimeWindow.CartInfo(sensorId=1, count=1)
7> WindowDemo01_TimeWindow.CartInfo(sensorId=1, count=9)
7> WindowDemo01_TimeWindow.CartInfo(sensorId=1, count=18)
7> WindowDemo01_TimeWindow.CartInfo(sensorId=1, count=17)
7> WindowDemo01_TimeWindow.CartInfo(sensorId=1, count=10)

時間的滑動視窗

/**
 * @author WGR
 * @create 2021/9/10 -- 9:42
 */
public class WindowDemo02_TimeWindow {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketDS  = env.socketTextStream("192.168.1.180", 9998);

        SingleOutputStreamOperator<CartInfo> cartInfoDS  = socketDS.map(new MapFunction<String, CartInfo>() {

            @Override
            public CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });

        // 需求2:每3秒鐘統計一次,最近5秒鐘內,各個路口/訊號燈通過紅綠燈汽車的數量--基於時間的滑動視窗
        SingleOutputStreamOperator<CartInfo> result1  = cartInfoDS.keyBy(CartInfo::getSensorId)
                .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(3)))
                .sum("count");

        result1.print();
        env.execute();

    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//訊號燈id
        private Integer count;//通過該訊號燈的車的數量
    }
}

結果:

7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=3)   1+2
7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=10) 1+2+3+4
7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=15) 4+5+6
7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=21) 6+7+8
7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=27)  8+9+10
7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=10) 10

基於數量的滾動和滑動視窗

數量的滾動視窗

/**
 * @author WGR
 * @create 2021/9/10 -- 9:53
 */
public class WindowDemo03_CountWindow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketDS  = env.socketTextStream("192.168.1.180", 9998);

        SingleOutputStreamOperator<CartInfo> cartInfoDS  = socketDS.map(new MapFunction<String, CartInfo>() {

            @Override
            public CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });

        // 需求1:統計在最近5條訊息中,各自路口通過的汽車數量,相同的key每出現5次進行統計--基於數量的滾動視窗
        SingleOutputStreamOperator<CartInfo> result1  = cartInfoDS.keyBy(CartInfo::getSensorId)
                .countWindow(5L)
                .sum("count");

        result1.print();
        env.execute();
    }



    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//訊號燈id
        private Integer count;//通過該訊號燈的車的數量
    }
}

結果:需要等到id為1的數量達到5個的時候才會觸發。

7> WindowDemo03_CountWindow.CartInfo(sensorId=1, count=6)
1,1
1,1
1,1
1,1
2,1
1,2

數量的滑動視窗

/**
 * @author WGR
 * @create 2021/9/10 -- 9:53
 */
public class WindowDemo04_CountWindow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketDS  = env.socketTextStream("192.168.1.180", 9998);

        SingleOutputStreamOperator<WindowDemo03_CountWindow.CartInfo> cartInfoDS  = socketDS.map(new MapFunction<String, WindowDemo03_CountWindow.CartInfo>() {

            @Override
            public WindowDemo03_CountWindow.CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new WindowDemo03_CountWindow.CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });

        // 統計在最近5條訊息中,各自路口通過的汽車數量,相同的key每出現3次進行統計--基於數量的滑動視窗
        SingleOutputStreamOperator<WindowDemo03_CountWindow.CartInfo> result1  = cartInfoDS.keyBy(WindowDemo03_CountWindow.CartInfo::getSensorId)
                .countWindow(5L,3L)
                .sum("count");

        result1.print();
        env.execute();
    }

}

結果:

7> WindowDemo03_CountWindow.CartInfo(sensorId=1, count=3)
1,1
1,1
2,1
1,1
2,1
3,1
4,1

基於會話視窗

設定會話超時時間為10s,10s內沒有資料到來,則觸發上個視窗的計算

/**
 * @author WGR
 * @create 2021/9/10 -- 10:09
 */
public class WindowDemo05_SessionWindow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketDS  = env.socketTextStream("192.168.1.180", 9998);

        SingleOutputStreamOperator<WindowDemo03_CountWindow.CartInfo> cartInfoDS  = socketDS.map(new MapFunction<String, WindowDemo03_CountWindow.CartInfo>() {

            @Override
            public WindowDemo03_CountWindow.CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new WindowDemo03_CountWindow.CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });

        // 需求:設定會話超時時間為10s,10s內沒有資料到來,則觸發上個視窗的計算(前提是上一個視窗得有資料!)
        SingleOutputStreamOperator<WindowDemo03_CountWindow.CartInfo> result1  = cartInfoDS.keyBy(WindowDemo03_CountWindow.CartInfo::getSensorId)
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
                .sum("count");

        result1.print();
        env.execute();
    }
}