Flink基礎(六):Flink 中的 Window
阿新 • • 發佈:2020-08-03
1 Window
1.1 Window 概述 streaming 流式計算是一種被設計用於處理無限資料集的資料處理引擎,而無限資料集是指一種不斷增長的本質上無限的資料集,而 window 是一種切割無限資料 為有限塊進行處理的手段。 Window 是無限資料流處理的核心,Window 將一個無限的 stream 拆分成有限大小的”buckets”桶,我們可以在這些桶上做計算操作。 1.2 Window 型別 Window 可以分成兩類: CountWindow:按照指定的資料條數生成一個 Window,與時間無關。 TimeWindow:按照時間生成 Window。 對於 TimeWindow,可以根據視窗實現原理的不同分成三類:滾動視窗(TumblingWindow)、滑動視窗(Sliding Window)和會話視窗(Session Window)。 1. 滾動視窗(Tumbling Windows) 將資料依據固定的視窗長度對資料進行切片。 特點:時間對齊,視窗長度固定,沒有重疊。2 Window API
2.1 TimeWindow TimeWindow 是將指定時間範圍內的所有資料組成一個 window,一次對一個window 裡面的所有資料進行計算。 1. 滾動視窗 Flink 預設的時間視窗根據 Processing Time 進行視窗的劃分,將 Flink 獲取到的資料根據進入 Flink 的時間劃分到不同的視窗中。val minTempPerWindow = dataStream .map(rView Code 時間間隔可以通過 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。 2. 滑動視窗(SlidingEventTimeWindows) 滑動視窗和滾動視窗的函式名是完全一致的,只是在傳引數時需要傳入兩個引數,一個是 window_size,一個是 sliding_size。下面程式碼中的 sliding_size 設定為了 5s,也就是說,視窗每 5s 就計算一次,每 一次計算的 window 範圍是 15s 內的所有元素。=> (r.id, r.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(15)) .reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
val minTempPerWindow: DataStream[(String, Double)] = dataStream .map(r => (r.id, r.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(15), Time.seconds(5)) .reduce((r1, r2) => (r1._1, r1._2.min(r2._2))) .window(EventTimeSessionWindows.withGap(Time.minutes(10))View Code 時間間隔可以通過 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。 2.2 CountWindow CountWindow 根據視窗中相同 key 元素的數量來觸發執行,執行時只計算元素數量達到視窗大小的 key 對應的結果。 注意:CountWindow 的 window_size 指的是相同 Key 的元素的個數,不是輸入的所有元素的總數。 1 滾動視窗 預設的 CountWindow 是一個滾動視窗,只需要指定視窗大小即可,當元素數量達到視窗大小時,就會觸發視窗的執行。
val minTempPerWindow: DataStream[(String, Double)] = dataStream .map(r => (r.id, r.temperature)) .keyBy(_._1) .countWindow(5) .reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))2 滑動視窗 滑動視窗和滾動視窗的函式名是完全一致的,只是在傳引數時需要傳入兩個引數,一個是 window_size,一個是 sliding_size。 下面程式碼中的 sliding_size 設定為了 2,也就是說,每收到兩個相同 key 的資料就計算一次,每一次計算的 window 範圍是 5 個元素。
val keyedStream: KeyedStream[(String, Int), Tuple] = dataStream.map(r => (r.id, r.temperature)).keyBy(0) //每當某一個 key 的個數達到 2 的時候,觸發計算,計算最近該 key 最近 10 個元素的內容 val windowedStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyedStream.countWindow(10,2) val sumDstream: DataStream[(String, Int)] = windowedStream.sum(1)2.3 window function window function 定義了要對視窗中收集的資料做的計算操作,主要可以分為兩類: 增量聚合函式(incremental aggregation functions) 每條資料到來就進行計算,保持一個簡單的狀態。典型的增量聚合函式有 ReduceFunction, AggregateFunction。 全視窗函式(full window functions) 先把視窗所有資料收集起來,等到計算的時候會遍歷所有資料。 ProcessWindowFunction 就是一個全視窗函式。 2.4 其它可選 API .trigger() —— 觸發器 定義 window 什麼時候關閉,觸發計算並輸出結果 .evitor() —— 移除器 定義移除某些資料的邏輯 .allowedLateness() —— 允許處理遲到的資料 .sideOutputLateData() —— 將遲到的資料放入側輸出流 .getSideOutput() —— 獲取側輸出流