Flink-Window
Flink的高階API
Flink的基石
Flink之所以能這麼流行,離不開它最重要的四個基石:Checkpoint、State、Time、Window。
Checkpoint
這是Flink最重要的一個特性。
Flink基於Chandy-Lamport演算法實現了一個分散式的一致性的快照,從而提供了一致性的語義。
Chandy-Lamport演算法實際上在1985年的時候已經被提出來,但並沒有被很廣泛的應用,而Flink則把這個演算法發揚光大了。
Spark最近在實現Continue streaming,Continue streaming的目的是為了降低處理的延時,其也需要提供這種一致性的語義,最終也採用了Chandy-Lamport這個演算法,說明Chandy-Lamport演算法在業界得到了一定的肯定。
State
提供了一致性的語義之後,Flink為了讓使用者在程式設計時能夠更輕鬆、更容易地去管理狀態,還提供了一套非常簡單明瞭的State API,包括ValueState、ListState、MapState,BroadcastState。
Time
除此之外,Flink還實現了Watermark的機制,能夠支援基於事件的時間的處理,能夠容忍遲到/亂序的資料。
Window
另外流計算中一般在對流資料進行操作之前都會先進行開窗,即基於一個什麼樣的視窗上做這個計算。Flink提供了開箱即用的各種視窗,比如滑動視窗、滾動視窗、會話視窗以及非常靈活的自定義的視窗。
Flink-Window操作
為什麼需要Window
在流處理應用中,資料是連續不斷的,有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內有多少使用者點選了我們的網頁。
在這種情況下,我們必須定義一個視窗(window),用來收集最近1分鐘內的資料,並對這個視窗內的資料進行計算。
Window的分類
按照time和count分類
- time-window:時間視窗:根據時間劃分視窗,如:每xx分鐘統計最近xx分鐘的資料
- count-window:數量視窗:根據數量劃分視窗,如:每xx個數據統計最近xx個數據
按照slide和size分類
視窗有兩個重要的屬性: 視窗大小size和滑動間隔slide,根據它們的大小關係可分為:
- tumbling-window:滾動視窗:size=slide,如:每隔10s統計最近10s的資料
- sliding-window:滑動視窗:size>slide,如:每隔5s統計最近10s的資料
總結:
按照上面視窗的分類方式進行組合,可以得出如下的視窗:
1.基於時間的滾動視窗tumbling-time-window--用的較多
2.基於時間的滑動視窗sliding-time-window--用的較多
3.基於數量的滾動視窗tumbling-count-window--用的較少
4.基於數量的滑動視窗sliding-count-window--用的較少
注意:Flink還支援一個特殊的視窗:Session會話視窗,需要設定一個會話超時時間,如30s,則表示30s內沒有資料到來,則觸發上個視窗的計算
Window的API
window和windowAll
使用keyby的流,應該使用window方法
未使用keyby的流,應該呼叫windowAll方法
WindowAssigner
window/windowAll 方法接收的輸入是一個 WindowAssigner, WindowAssigner 負責將每條輸入的資料分發到正確的 window 中,
Flink提供了很多各種場景用的WindowAssigner:
如果需要自己定製資料分發策略,則可以實現一個 class,繼承自 WindowAssigner。
evictor
evictor 主要用於做一些資料的自定義操作,可以在執行使用者程式碼之前,也可以在執行
使用者程式碼之後,更詳細的描述可以參考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter兩個方法。
Flink 提供瞭如下三種通用的 evictor:
-
CountEvictor 保留指定數量的元素
-
TimeEvictor 設定一個閾值 interval,刪除所有不再 max_ts - interval 範圍內的元素,其中 max_ts 是視窗內時間戳的最大值。
-
DeltaEvictor 通過執行使用者給定的 DeltaFunction 以及預設的 theshold,判斷是否刪除一個元素。
trigger
trigger 用來判斷一個視窗是否需要被觸發,每個 WindowAssigner 都自帶一個預設的trigger,
如果預設的 trigger 不能滿足你的需求,則可以自定義一個類,繼承自Trigger 即可,我們詳細描述下 Trigger 的介面以及含義:
-
onElement() 每次往 window 增加一個元素的時候都會觸發
-
onEventTime() 當 event-time timer 被觸發的時候會呼叫
-
onProcessingTime() 當 processing-time timer 被觸發的時候會呼叫
-
onMerge() 對兩個 `rigger 的 state 進行 merge 操作
-
clear() window 銷燬的時候被呼叫
上面的介面中前三個會返回一個 TriggerResult, TriggerResult 有如下幾種可能的選擇:
-
CONTINUE 不做任何事情
-
FIRE 觸發 window
-
PURGE 清空整個 window 的元素並銷燬視窗
-
FIRE_AND_PURGE 觸發視窗,然後銷燬視窗
基於時間的滾動和滑動視窗
時間的滾動視窗
/**
* @author WGR
* @create 2021/9/10 -- 9:24
*/
public class WindowDemo01_TimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketDS = env.socketTextStream("192.168.1.180", 9998);
SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
@Override
public CartInfo map(String value) throws Exception {
String[] arr = value.split(",");
return new CartInfo(arr[0], Integer.parseInt(arr[1]));
}
});
// 需求1:每5秒鐘統計一次,最近5秒鐘內,各個路口/訊號燈通過紅綠燈汽車的數量--基於時間的滾動視窗
SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS.keyBy(CartInfo::getSensorId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum("count");
result1.print();
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
private String sensorId;//訊號燈id
private Integer count;//通過該訊號燈的車的數量
}
}
結果:
7> WindowDemo01_TimeWindow.CartInfo(sensorId=1, count=1)
7> WindowDemo01_TimeWindow.CartInfo(sensorId=1, count=9)
7> WindowDemo01_TimeWindow.CartInfo(sensorId=1, count=18)
7> WindowDemo01_TimeWindow.CartInfo(sensorId=1, count=17)
7> WindowDemo01_TimeWindow.CartInfo(sensorId=1, count=10)
時間的滑動視窗
/**
* @author WGR
* @create 2021/9/10 -- 9:42
*/
public class WindowDemo02_TimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketDS = env.socketTextStream("192.168.1.180", 9998);
SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
@Override
public CartInfo map(String value) throws Exception {
String[] arr = value.split(",");
return new CartInfo(arr[0], Integer.parseInt(arr[1]));
}
});
// 需求2:每3秒鐘統計一次,最近5秒鐘內,各個路口/訊號燈通過紅綠燈汽車的數量--基於時間的滑動視窗
SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS.keyBy(CartInfo::getSensorId)
.window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(3)))
.sum("count");
result1.print();
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
private String sensorId;//訊號燈id
private Integer count;//通過該訊號燈的車的數量
}
}
結果:
7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=3) 1+2
7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=10) 1+2+3+4
7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=15) 4+5+6
7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=21) 6+7+8
7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=27) 8+9+10
7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=10) 10
基於數量的滾動和滑動視窗
數量的滾動視窗
/**
* @author WGR
* @create 2021/9/10 -- 9:53
*/
public class WindowDemo03_CountWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketDS = env.socketTextStream("192.168.1.180", 9998);
SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
@Override
public CartInfo map(String value) throws Exception {
String[] arr = value.split(",");
return new CartInfo(arr[0], Integer.parseInt(arr[1]));
}
});
// 需求1:統計在最近5條訊息中,各自路口通過的汽車數量,相同的key每出現5次進行統計--基於數量的滾動視窗
SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS.keyBy(CartInfo::getSensorId)
.countWindow(5L)
.sum("count");
result1.print();
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
private String sensorId;//訊號燈id
private Integer count;//通過該訊號燈的車的數量
}
}
結果:需要等到id為1的數量達到5個的時候才會觸發。
7> WindowDemo03_CountWindow.CartInfo(sensorId=1, count=6)
1,1
1,1
1,1
1,1
2,1
1,2
數量的滑動視窗
/**
* @author WGR
* @create 2021/9/10 -- 9:53
*/
public class WindowDemo04_CountWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketDS = env.socketTextStream("192.168.1.180", 9998);
SingleOutputStreamOperator<WindowDemo03_CountWindow.CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, WindowDemo03_CountWindow.CartInfo>() {
@Override
public WindowDemo03_CountWindow.CartInfo map(String value) throws Exception {
String[] arr = value.split(",");
return new WindowDemo03_CountWindow.CartInfo(arr[0], Integer.parseInt(arr[1]));
}
});
// 統計在最近5條訊息中,各自路口通過的汽車數量,相同的key每出現3次進行統計--基於數量的滑動視窗
SingleOutputStreamOperator<WindowDemo03_CountWindow.CartInfo> result1 = cartInfoDS.keyBy(WindowDemo03_CountWindow.CartInfo::getSensorId)
.countWindow(5L,3L)
.sum("count");
result1.print();
env.execute();
}
}
結果:
7> WindowDemo03_CountWindow.CartInfo(sensorId=1, count=3)
1,1
1,1
2,1
1,1
2,1
3,1
4,1
基於會話視窗
設定會話超時時間為10s,10s內沒有資料到來,則觸發上個視窗的計算
/**
* @author WGR
* @create 2021/9/10 -- 10:09
*/
public class WindowDemo05_SessionWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketDS = env.socketTextStream("192.168.1.180", 9998);
SingleOutputStreamOperator<WindowDemo03_CountWindow.CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, WindowDemo03_CountWindow.CartInfo>() {
@Override
public WindowDemo03_CountWindow.CartInfo map(String value) throws Exception {
String[] arr = value.split(",");
return new WindowDemo03_CountWindow.CartInfo(arr[0], Integer.parseInt(arr[1]));
}
});
// 需求:設定會話超時時間為10s,10s內沒有資料到來,則觸發上個視窗的計算(前提是上一個視窗得有資料!)
SingleOutputStreamOperator<WindowDemo03_CountWindow.CartInfo> result1 = cartInfoDS.keyBy(WindowDemo03_CountWindow.CartInfo::getSensorId)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.sum("count");
result1.print();
env.execute();
}
}