1. 程式人生 > 實用技巧 >Flink基礎(六):Flink 中的 Window

Flink基礎(六):Flink 中的 Window

1 Window

1.1 Window 概述   streaming 流式計算是一種被設計用於處理無限資料集的資料處理引擎,而無限資料集是指一種不斷增長的本質上無限的資料集,而 window 是一種切割無限資料 為有限塊進行處理的手段。   Window 是無限資料流處理的核心,Window 將一個無限的 stream 拆分成有限大小的”buckets”桶,我們可以在這些桶上做計算操作。 1.2 Window 型別   Window 可以分成兩類:    CountWindow:按照指定的資料條數生成一個 Window,與時間無關。    TimeWindow:按照時間生成 Window。   對於 TimeWindow,可以根據視窗實現原理的不同分成三類:滾動視窗(TumblingWindow)、滑動視窗(Sliding Window)和會話視窗(Session Window)。   1. 滾動視窗(Tumbling Windows)   將資料依據固定的視窗長度對資料進行切片。   特點:時間對齊,視窗長度固定,沒有重疊。
  滾動視窗分配器將每個元素分配到一個指定視窗大小的視窗中,滾動視窗有一個固定的大小,並且不會出現重疊。例如:如果你指定了一個 5 分鐘大小的滾動窗 口,視窗的建立如下圖所示: 適用場景:適合做 BI 統計等(做每個時間段的聚合計算)。   2. 滑動視窗(Sliding Windows)   滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗長度和滑動間隔組成。   特點:時間對齊,視窗長度固定,可以有重疊。   滑動視窗分配器將元素分配到固定長度的視窗中,與滾動視窗類似,視窗的大小由視窗大小引數來配置,另一個視窗滑動引數控制滑動視窗開始的頻率。因此,滑動視窗如果滑動引數小於視窗大小的話,視窗是可以重疊的,在這種情況下元素會被分配到多個視窗中。   例如,你有 10 分鐘的視窗和 5 分鐘的滑動,那麼每個視窗中 5 分鐘的窗口裡包含著上個 10 分鐘產生的資料,如下圖所示:   適用場景:對最近一個時間段內的統計(求某介面最近 5min 的失敗率來決定是否要報警)。 3. 會話視窗(Session Windows)   由一系列事件組合一個指定時間長度的 timeout 間隙組成,類似於 web 應用的session,也就是一段時間沒有接收到新資料就會生成新的視窗。   特點:時間無對齊。
  session 視窗分配器通過 session 活動來對元素進行分組,session 視窗跟滾動視窗和滑動視窗相比,不會有重疊和固定的開始時間和結束時間的情況,相反,當它在一個固定的時間週期內不再收到元素,即非活動間隔產生,那個這個視窗就會關閉。一個 session 視窗通過一個 session 間隔來配置,這個 session 間隔定義了非活躍週期的長度,當這個非活躍週期產生,那麼當前的 session 將關閉並且後續的元素將 被分配到新的 session 視窗中去。

2 Window API

2.1 TimeWindow   TimeWindow 是將指定時間範圍內的所有資料組成一個 window,一次對一個window 裡面的所有資料進行計算。 1. 滾動視窗   Flink 預設的時間視窗根據 Processing Time 進行視窗的劃分,將 Flink 獲取到的資料根據進入 Flink 的時間劃分到不同的視窗中。
val minTempPerWindow = dataStream
 .map(r 
=> (r.id, r.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(15)) .reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
View Code   時間間隔可以通過 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。 2. 滑動視窗(SlidingEventTimeWindows)   滑動視窗和滾動視窗的函式名是完全一致的,只是在傳引數時需要傳入兩個引數,一個是 window_size,一個是 sliding_size。下面程式碼中的 sliding_size 設定為了 5s,也就是說,視窗每 5s 就計算一次,每 一次計算的 window 範圍是 15s 內的所有元素。
val minTempPerWindow: DataStream[(String, Double)] = dataStream
 .map(r => (r.id, r.temperature))
 .keyBy(_._1)
 .timeWindow(Time.seconds(15), Time.seconds(5))
 .reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
.window(EventTimeSessionWindows.withGap(Time.minutes(10))
View Code   時間間隔可以通過 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。 2.2 CountWindow   CountWindow 根據視窗中相同 key 元素的數量來觸發執行,執行時只計算元素數量達到視窗大小的 key 對應的結果。   注意:CountWindow 的 window_size 指的是相同 Key 的元素的個數,不是輸入的所有元素的總數。 1 滾動視窗   預設的 CountWindow 是一個滾動視窗,只需要指定視窗大小即可,當元素數量達到視窗大小時,就會觸發視窗的執行。
val minTempPerWindow: DataStream[(String, Double)] = dataStream
 .map(r => (r.id, r.temperature))
 .keyBy(_._1)
 .countWindow(5)
 .reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))
2 滑動視窗   滑動視窗和滾動視窗的函式名是完全一致的,只是在傳引數時需要傳入兩個引數,一個是 window_size,一個是 sliding_size。   下面程式碼中的 sliding_size 設定為了 2,也就是說,每收到兩個相同 key 的資料就計算一次,每一次計算的 window 範圍是 5 個元素。
val keyedStream: KeyedStream[(String, Int), Tuple] = dataStream.map(r => 
(r.id, r.temperature)).keyBy(0)
//每當某一個 key 的個數達到 2 的時候,觸發計算,計算最近該 key 最近 10 個元素的內容
val windowedStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = 
keyedStream.countWindow(10,2)
val sumDstream: DataStream[(String, Int)] = windowedStream.sum(1)
2.3 window function   window function 定義了要對視窗中收集的資料做的計算操作,主要可以分為兩類:  增量聚合函式(incremental aggregation functions)   每條資料到來就進行計算,保持一個簡單的狀態。典型的增量聚合函式有   ReduceFunction, AggregateFunction。  全視窗函式(full window functions)   先把視窗所有資料收集起來,等到計算的時候會遍歷所有資料。   ProcessWindowFunction 就是一個全視窗函式。 2.4 其它可選 API  .trigger() —— 觸發器   定義 window 什麼時候關閉,觸發計算並輸出結果  .evitor() —— 移除器   定義移除某些資料的邏輯  .allowedLateness() —— 允許處理遲到的資料  .sideOutputLateData() —— 將遲到的資料放入側輸出流  .getSideOutput() —— 獲取側輸出流