Flink window
視窗計算
我們經常需要在一個時間視窗維度上對資料進行聚合,視窗是流處理應用中經常需要解決的問題。Flink的視窗運算元為我們提供了方便易用的API,我們可以將資料流切分成一個個視窗,對視窗內的資料進行處理
按照有沒有進行keyby分成了兩種 不同的處理方式:
-
首先,我們要決定是否對一個DataStream按照Key進行分組,這一步必須在視窗計算之前進行。
-
windowAll不對資料流進行分組,所有資料將傳送到後續執行的運算元單個例項上。
-
經過windowAll的運算元是不分組的視窗(Non-Keyed Window),它們的原理和操作與Keyed Window類似,唯一的區別在於所有資料將傳送給下游的單個例項,或者說下游運算元的並行度為1。
// Keyed Window stream .keyBy(...) <- 按照一個Key進行分組 .window(...) <- 將資料流中的元素分配到相應的視窗中 [.trigger(...)] <- 指定觸發器Trigger(可選) [.evictor(...)] <- 指定清除器Evictor(可選) .reduce/aggregate/process() <- 視窗處理函式Window Function // Non-Keyed Window stream .windowAll(...) <- 不分組,將資料流中的所有元素分配到相應的視窗中 [.trigger(...)] <- 指定觸發器Trigger(可選) [.evictor(...)] <- 指定清除器Evictor(可選) .reduce/aggregate/process() <- 視窗處理函式Window Function
視窗生命週期
- 一個視窗在第一個屬於它的元素到達時就會被建立,然後在時間(event 或 processing time) 超過視窗的“結束時間戳 + 使用者定義的 allowed lateness (詳見 Allowed Lateness)”時 被完全刪除.
對於一個基於 event time 且範圍互不重合(滾動)的視窗策略, 如果視窗設定的時長為五分鐘、可容忍的遲到時間(allowed lateness)為 1 分鐘, 那麼第一個元素落入 12:00 至 12:05 這個區間時,Flink 就會為這個區間建立一個新的視窗。 當 watermark 越過 12:06 時,這個視窗將被摧毀。
-
每個視窗會設定自己的 Trigger 和 function (ProcessWindowFunction、ReduceFunction、或 AggregateFunction, )。該 function 決定如何計算視窗中的內容, 而 Trigger 決定何時視窗中的資料可以被 function 計算
-
也可以指定一個 Evictor ),在 trigger 觸發之後,Evictor 可以在視窗函式的前後刪除資料。
Window Assigners
-
Window assigner 定義了 stream 中的元素如何被分發到各個視窗
-
Flink 為最常用的情況提供了一些定義好的 window assigner,也就是 tumbling windows、 sliding windows、 session windows 和 global windows。
-
可以繼承 WindowAssigner 類來實現自定義的 window assigner。 所有內建的 window assigner(除了 global window)都是基於時間分發資料的,processing time 或 event time 均可
-
基於時間的視窗用 start timestamp(包含)和 end timestamp(不包含)描述視窗的大小。 在程式碼中,Flink 處理基於時間的視窗使用的是 TimeWindow, 它有查詢開始和結束 timestamp 以及返回視窗所能儲存的最大 timestamp 的方法 maxTimestamp()
滾動視窗(Tumbling Windows)
滾動視窗的大小是固定的,且各自範圍之間不重疊
val input: DataStream[T] = ...
// 滾動 event-time 視窗
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// 滾動 processing-time 視窗
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// 長度為一天的滾動 event-time 視窗,偏移量為 -8 小時。
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
滑動視窗(Sliding Windows)
視窗大小是固定的,視窗有可能有重疊。視窗會有一個滑動步長
al input: DataStream[T] = ...
// 滑動 event-time 視窗
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// 滑動 processing-time 視窗
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// 滑動 processing-time 視窗,偏移量為 -8 小時
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
會話視窗(Session Windows)
視窗大小不固定,視窗之間會有一個間隙(gap).會話視窗根據Session gap切分不同的視窗,當一個視窗在大於Session gap的時間內沒有接收到新資料時,視窗將關閉。在這種模式下,視窗的長度是可變的,每個視窗的開始和結束時間並不是確定的
val input: DataStream[T] = ...
// 設定了固定間隔的 event-time 會話視窗
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// 設定了動態間隔的 event-time 會話視窗
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// 決定並返回會話間隔
}
}))
.<windowed transformation>(<window function>)
// 設定了固定間隔的 processing-time 會話視窗
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// 設定了動態間隔的 processing-time 會話視窗
input
.keyBy(<key selector>)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// 決定並返回會話間隔
}
}))
.<windowed transformation>(<window function>)
全域性視窗(Global Windows)
整個資料流是一個視窗,因為資料流是無界的,所以全域性視窗預設情況下,永遠不會觸發計算資料, 要定義trigger
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>)
視窗函式
視窗函式主要分為兩種,一種是增量計算,如reduce和aggregate,一種是全量計算,如process。
- 增量計算指的是視窗儲存一份中間資料,每流入一個新元素,新元素與中間資料兩兩合一,生成新的中間資料,再儲存到視窗中
2.全量計算指的是視窗先快取該視窗所有元素,等到觸發條件後對視窗內的全量元素執行計算
ReduceFunction
ReduceFunction 指定兩條輸入資料如何合併起來產生一條輸出資料,輸入和輸出資料的型別必須相同。 Flink 使用 ReduceFunction 對視窗中的資料進行增量聚合。
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
AggregateFunction
ReduceFunction 是 AggregateFunction 的特殊情況。 AggregateFunction 接收三個型別:輸入資料的型別(IN)、累加器的型別(ACC)和輸出資料的型別(OUT)。 輸入資料的型別是輸入流的元素型別,AggregateFunction 介面有如下幾個方法: 把每一條元素加進累加器、建立初始累加器、合併兩個累加器、從累加器中提取輸出(OUT 型別
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
override def createAccumulator() = (0L, 0L)
override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator._1 + value._2, accumulator._2 + 1L)
override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
override def merge(a: (Long, Long), b: (Long, Long)) =
(a._1 + b._1, a._2 + b._2)
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate)
ProcessWindowFunction
ProcessWindowFunction 有能獲取包含視窗內所有元素的 Iterable, 以及用來獲取時間和狀態資訊的 Context 物件,比其他視窗函式更加靈活。 ProcessWindowFunction 的靈活性是以效能和資源消耗為代價的, 因為視窗中的資料無法被增量聚合,而需要在視窗觸發前快取所有資料。
val input: DataStream[(String, Long)] = ...
input
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new MyProcessWindowFunction())
/* ... */
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window ${context.window} count: $count")
}
}
增量聚合的 ProcessWindowFunction
ProcessWindowFunction 可以與 ReduceFunction 或 AggregateFunction 搭配使用,它就可以增量聚合視窗的元素並且從 ProcessWindowFunction` 中獲得視窗的元資料。
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
( key: String,
context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,
minReadings: Iterable[SensorReading],
out: Collector[(Long, SensorReading)] ) =>
{
val min = minReadings.iterator.next()
out.collect((context.window.getStart, min))
}
)
Triggers
Trigger 決定了一個視窗(由 window assigner 定義)何時可以被 window function 處理
Trigger 介面提供了五個方法來響應不同的事件:
- onElement() 方法在每個元素被加入視窗時呼叫。
- onEventTime() 方法在註冊的 event-time timer 觸發時呼叫。
- onProcessingTime() 方法在註冊的 processing-time timer 觸發時呼叫。
- onMerge() 方法與有狀態的 trigger 相關。該方法會在兩個視窗合併時, 將視窗對應 trigger 的狀態進行合併,比如使用會話視窗時。
- clear() 方法處理在對應視窗被移除時所需的邏輯。
Evictors
Flink 的視窗模型允許在 WindowAssigner 和 Trigger 之外指定可選的 Evictor。 如本文開篇的程式碼中所示,通過 evictor(...) 方法傳入 Evictor。 Evictor 可以在 trigger 觸發後、呼叫視窗函式之前或之後從視窗中刪除元素
Flink 內建有三個 evictor:
-
CountEvictor: 僅記錄使用者指定數量的元素,一旦視窗中的元素超過這個數量,多餘的元素會從視窗快取的開頭移除
-
DeltaEvictor: 接收 DeltaFunction 和 threshold 引數,計算最後一個元素與視窗快取中所有元素的差值, 並移除差值大於或等於 threshold 的元素。
-
TimeEvictor: 接收 interval 引數,以毫秒錶示。 它會找到視窗中元素的最大 timestamp max_ts 並移除比 max_ts - interval 小的所有元素。
預設情況下,所有內建的 evictor 邏輯都在呼叫視窗函式前執行。
Allowed Lateness
預設情況下,watermark 一旦越過視窗結束的 timestamp,遲到的資料就會被直接丟棄。 但是 Flink 允許指定視窗運算元最大的 allowed lateness。 Allowed lateness 定義了一個元素可以在遲到多長時間的情況下不被丟棄,這個引數預設是 0。 在 watermark 超過視窗末端、到達視窗末端加上 allowed lateness 之前的這段時間內到達的元素, 依舊會被加入視窗。取決於視窗的 trigger,一個遲到但沒有被丟棄的元素可能會再次觸發視窗,比如 EventTimeTrigger
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>)
關於狀態大小的考量
-
Flink 會為一個元素在它所屬的每一個視窗中都建立一個副本
,設定一個大小為一天、滑動距離為一秒的滑動視窗可能不是個好想法 -
educeFunction 和 AggregateFunction 可以極大地減少儲存需求,因為他們會就地聚合到達的元素, 且每個視窗僅儲存一個值。而使用 ProcessWindowFunction 需要累積視窗中所有的元素
-
使用 Evictor 可以避免預聚合, 因為視窗中的所有資料必須先經過 evictor 才能進行計算