Flink 中極其重要的 Time 與 Window 詳細解析(深度好文,建議收藏)
阿新 • • 發佈:2021-01-25
### 前言
Flink 是流式的、實時的 計算引擎
上面一句話就有兩個概念,一個是流式,一個是實時。
**流式**:就是資料來源源不斷的流進來,也就是資料沒有邊界,但是我們計算的時候必須在一個有邊界的範圍內進行,所以這裡面就有一個問題,邊界怎麼確定? 無非就兩種方式,**根據時間段或者資料量進行確定**,根據時間段就是每隔多長時間就劃分一個邊界,根據資料量就是每來多少條資料劃分一個邊界,Flink 中就是這麼劃分邊界的,本文會詳細講解。
**實時**:就是資料傳送過來之後立馬就進行相關的計算,然後將結果輸出。這裡的計算有兩種:
- **一種是隻有邊界內的資料進行計算**,這種好理解,比如統計每個使用者最近五分鐘內瀏覽的新聞數量,就可以取最近五分鐘內的所有資料,然後根據每個使用者分組,統計新聞的總數。
- **另一種是邊界內資料與外部資料進行關聯計算**,比如:統計最近五分鐘內瀏覽新聞的使用者都是來自哪些地區,這種就需要將五分鐘內瀏覽新聞的使用者資訊與 hive 中的地區維表進行關聯,然後在進行相關計算。
本篇文章所講的 Flink 的內容就是圍繞以上概念進行詳細剖析的!
## Time與Window
### Time
在Flink中,如果以時間段劃分邊界的話,那麼時間就是一個極其重要的欄位。
Flink中的時間有三種類型,如下圖所示:
![](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210123_1.png)
- **Event Time**:是事件建立的時間。它通常由事件中的時間戳描述,例如採集的日誌資料中,每一條日誌都會記錄自己的生成時間,Flink通過時間戳分配器訪問事件時間戳。
- **Ingestion Time**:是資料進入Flink的時間。
- **Processing Time**:是每一個執行基於時間操作的運算元的本地系統時間,與機器相關,預設的時間屬性就是Processing Time。
例如,一條日誌進入Flink的時間為2021-01-22 10:00:00.123,到達Window的系統時間為2021-01-22 10:00:01.234,日誌的內容如下:
2021-01-06 18:37:15.624 INFO Fail over to rm2
對於業務來說,要統計1min內的故障日誌個數,哪個時間是最有意義的?—— eventTime,因為我們要根據日誌的生成時間進行統計。
### Window
Window,即視窗,我們前面一直提到的邊界就是這裡的Window(視窗)。
官方解釋:**流式計算是一種被設計用於處理無限資料集的資料處理引擎,而無限資料集是指一種不斷增長的本質上無限的資料集,而window是一種切割無限資料為有限塊進行處理的手段**。
所以**Window是無限資料流處理的核心,Window將一個無限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計算操作**。
#### Window型別
本文剛開始提到,劃分視窗就兩種方式:
1. 根據時間進行擷取(time-driven-window),比如每1分鐘統計一次或每10分鐘統計一次。
2. 根據資料進行擷取(data-driven-window),比如每5個數據統計一次或每50個數據統計一次。
![視窗型別](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210123_2.png)
對於TimeWindow(根據時間劃分視窗), 可以根據視窗實現原理的不同分成三類:**滾動視窗(Tumbling Window)、滑動視窗(Sliding Window)和會話視窗(Session Window)**。
1. **滾動視窗(Tumbling Windows)**
將資料依據固定的視窗長度對資料進行切片。
特點:**時間對齊,視窗長度固定,沒有重疊**。
滾動視窗分配器將每個元素分配到一個指定視窗大小的視窗中,滾動視窗有一個固定的大小,並且不會出現重疊。
例如:如果你指定了一個5分鐘大小的滾動視窗,視窗的建立如下圖所示:
![滾動視窗](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210123_3.png)
適用場景:適合做BI統計等(做每個時間段的聚合計算)。
2. **滑動視窗(Sliding Windows)**
滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗長度和滑動間隔組成。
特點:**時間對齊,視窗長度固定,有重疊**。
滑動視窗分配器將元素分配到固定長度的視窗中,與滾動視窗類似,視窗的大小由視窗大小引數來配置,另一個視窗滑動引數控制滑動視窗開始的頻率。因此,滑動視窗如果滑動引數小於視窗大小的話,視窗是可以重疊的,在這種情況下元素會被分配到多個視窗中。
例如,你有10分鐘的視窗和5分鐘的滑動,那麼每個視窗中5分鐘的窗口裡包含著上個10分鐘產生的資料,如下圖所示:
![滑動視窗](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210123_9.png)
適用場景:對最近一個時間段內的統計(求某介面最近5min的失敗率來決定是否要報警)。
3. **會話視窗(Session Windows)**
由一系列事件組合一個指定時間長度的timeout間隙組成,類似於web應用的session,也就是一段時間沒有接收到新資料就會生成新的視窗。
特點:**時間無對齊**。
session視窗分配器通過session活動來對元素進行分組,session視窗跟滾動視窗和滑動視窗相比,不會有重疊和固定的開始時間和結束時間的情況,相反,**當它在一個固定的時間週期內不再收到元素,即非活動間隔產生,那個這個視窗就會關閉**。一個session視窗通過一個session間隔來配置,這個session間隔定義了非活躍週期的長度,當這個非活躍週期產生,那麼當前的session將關閉並且後續的元素將被分配到新的session視窗中去。
![會話視窗](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210123_10.png)
### Window API
#### TimeWindow
TimeWindow是將指定時間範圍內的所有資料組成一個window,一次對一個window裡面的所有資料進行計算(就是本文開頭說的對一個邊界內的資料進行計算)。
我們以 **紅綠燈路口通過的汽車數量** 為例子:
紅綠燈路口會有汽車通過,一共會有多少汽車通過,無法計算。因為車流源源不斷,計算沒有邊界。
所以我們統計每15秒鐘通過紅路燈的汽車數量,如第一個15秒為2輛,第二個15秒為3輛,第三個15秒為1輛 ...
- **tumbling-time-window (無重疊資料)**
我們使用 Linux 中的 nc 命令模擬資料的傳送方
```shell
1.開啟發送埠,埠號為9999
nc -lk 9999
2.傳送內容(key 代表不同的路口,value 代表每次通過的車輛)
一次傳送一行,傳送的時間間隔代表汽車經過的時間間隔
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4
```
Flink 進行採集資料並計算:
```
object Window {
def main(args: Array[String]): Unit = {
//TODO time-window
//1.建立執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.定義資料流來源
val text = env.socketTextStream("localhost", 9999)
//3.轉換資料格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
line => {
val tokens = line.split(",")
CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
}
}
//4.執行統計操作,每個sensorId一個tumbling視窗,視窗的大小為5秒
//也就是說,每5秒鐘統計一次,在這過去的5秒鐘內,各個路口通過紅綠燈汽車的數量。
val ds2: DataStream[CarWc] = ds1
.keyBy("sensorId")
.timeWindow(Time.seconds(5))
.sum("carCnt")
//5.顯示統計結果
ds2.print()
//6.觸發流計算
env.execute(this.getClass.getName)
}
}
```
我們傳送的資料並沒有指定時間欄位,所以Flink使用的是預設的 Processing Time,也就是Flink系統處理資料時的時間。
- **sliding-time-window (有重疊資料)**
```
//1.建立執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.定義資料流來源
val text = env.socketTextStream("localhost", 9999)
//3.轉換資料格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
line => {
val tokens = line.split(",")
CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
}
}
//4.執行統計操作,每個sensorId一個sliding視窗,視窗時間10秒,滑動時間5秒
//也就是說,每5秒鐘統計一次,在這過去的10秒鐘內,各個路口通過紅綠燈汽車的數量。
val ds2: DataStream[CarWc] = ds1
.keyBy("sensorId")
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum("carCnt")
//5.顯示統計結果
ds2.print()
//6.觸發流計算
env.execute(this.getClass.getName)
```
#### CountWindow
CountWindow根據視窗中相同key元素的數量來觸發執行,執行時只計算元素數量達到視窗大小的key對應的結果。
**注意:CountWindow的window_size指的是相同Key的元素的個數,不是輸入的所有元素的總數**。
- **tumbling-count-window (無重疊資料)**
```
//1.建立執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.定義資料流來源
val text = env.socketTextStream("localhost", 9999)
//3.轉換資料格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
(f) => {
val tokens = f.split(",")
CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
}
}
//4.執行統計操作,每個sensorId一個tumbling視窗,視窗的大小為5
//按照key進行收集,對應的key出現的次數達到5次作為一個結果
val ds2: DataStream[CarWc] = ds1
.keyBy("sensorId")
.countWindow(5)
.sum("carCnt")
//5.顯示統計結果
ds2.print()
//6.觸發流計算
env.execute(this.getClass.getName)
```
***
- **sliding-count-window (有重疊資料)**
同樣也是視窗長度和滑動視窗的操作:視窗長度是5,滑動長度是3
```
//1.建立執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.定義資料流來源
val text = env.socketTextStream("localhost", 9999)
//3.轉換資料格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
(f) => {
val tokens = f.split(",")
CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
}
}
//4.執行統計操作,每個sensorId一個sliding視窗,視窗大小3條資料,視窗滑動為3條資料
//也就是說,每個路口分別統計,收到關於它的3條訊息時統計在最近5條訊息中,各自路口通過的汽車數量
val ds2: DataStream[CarWc] = ds1
.keyBy("sensorId")
.countWindow(5, 3)
.sum("carCnt")
//5.顯示統計結果
ds2.print()
//6.觸發流計算
env.execute(this.getClass.getName)
```
***
- **Window 總結**
1. flink支援兩種劃分視窗的方式(time和count)
- 如果根據時間劃分視窗,那麼它就是一個time-window
- 如果根據資料劃分視窗,那麼它就是一個count-window
2. flink支援視窗的兩個重要屬性(size和interval)
- 如果size=interval,那麼就會形成tumbling-window(無重疊資料)
- 如果size>interval,那麼就會形成sliding-window(有重疊資料)
-