1. 程式人生 > 其它 >Flink window

Flink window

視窗計算

我們經常需要在一個時間視窗維度上對資料進行聚合,視窗是流處理應用中經常需要解決的問題。Flink的視窗運算元為我們提供了方便易用的API,我們可以將資料流切分成一個個視窗,對視窗內的資料進行處理

按照有沒有進行keyby分成了兩種 不同的處理方式:

  1. 首先,我們要決定是否對一個DataStream按照Key進行分組,這一步必須在視窗計算之前進行。

  2. windowAll不對資料流進行分組,所有資料將傳送到後續執行的運算元單個例項上。

  3. 經過windowAll的運算元是不分組的視窗(Non-Keyed Window),它們的原理和操作與Keyed Window類似,唯一的區別在於所有資料將傳送給下游的單個例項,或者說下游運算元的並行度為1。

// Keyed Window
stream
       .keyBy(...)               <-  按照一個Key進行分組
       .window(...)              <-  將資料流中的元素分配到相應的視窗中
      [.trigger(...)]            <-  指定觸發器Trigger(可選)
      [.evictor(...)]            <-  指定清除器Evictor(可選)
       .reduce/aggregate/process()      <-  視窗處理函式Window Function

// Non-Keyed Window
stream
       .windowAll(...)           <-  不分組,將資料流中的所有元素分配到相應的視窗中
      [.trigger(...)]            <-  指定觸發器Trigger(可選)
      [.evictor(...)]            <-  指定清除器Evictor(可選)
       .reduce/aggregate/process()      <-  視窗處理函式Window Function

視窗生命週期

  • 一個視窗在第一個屬於它的元素到達時就會被建立,然後在時間(event 或 processing time) 超過視窗的“結束時間戳 + 使用者定義的 allowed lateness (詳見 Allowed Lateness)”時 被完全刪除.

對於一個基於 event time 且範圍互不重合(滾動)的視窗策略, 如果視窗設定的時長為五分鐘、可容忍的遲到時間(allowed lateness)為 1 分鐘, 那麼第一個元素落入 12:00 至 12:05 這個區間時,Flink 就會為這個區間建立一個新的視窗。 當 watermark 越過 12:06 時,這個視窗將被摧毀。

  • 每個視窗會設定自己的 Trigger 和 function (ProcessWindowFunction、ReduceFunction、或 AggregateFunction, )。該 function 決定如何計算視窗中的內容, 而 Trigger 決定何時視窗中的資料可以被 function 計算

  • 也可以指定一個 Evictor ),在 trigger 觸發之後,Evictor 可以在視窗函式的前後刪除資料。

Window Assigners

  • Window assigner 定義了 stream 中的元素如何被分發到各個視窗

  • Flink 為最常用的情況提供了一些定義好的 window assigner,也就是 tumbling windows、 sliding windows、 session windows 和 global windows。

  • 可以繼承 WindowAssigner 類來實現自定義的 window assigner。 所有內建的 window assigner(除了 global window)都是基於時間分發資料的,processing time 或 event time 均可

  • 基於時間的視窗用 start timestamp(包含)和 end timestamp(不包含)描述視窗的大小。 在程式碼中,Flink 處理基於時間的視窗使用的是 TimeWindow, 它有查詢開始和結束 timestamp 以及返回視窗所能儲存的最大 timestamp 的方法 maxTimestamp()

滾動視窗(Tumbling Windows)

滾動視窗的大小是固定的,且各自範圍之間不重疊

val input: DataStream[T] = ...

// 滾動 event-time 視窗
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// 滾動 processing-time 視窗
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// 長度為一天的滾動 event-time 視窗,偏移量為 -8 小時。
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

滑動視窗(Sliding Windows)

視窗大小是固定的,視窗有可能有重疊。視窗會有一個滑動步長


al input: DataStream[T] = ...

// 滑動 event-time 視窗
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// 滑動 processing-time 視窗
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// 滑動 processing-time 視窗,偏移量為 -8 小時
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

會話視窗(Session Windows)

視窗大小不固定,視窗之間會有一個間隙(gap).會話視窗根據Session gap切分不同的視窗,當一個視窗在大於Session gap的時間內沒有接收到新資料時,視窗將關閉。在這種模式下,視窗的長度是可變的,每個視窗的開始和結束時間並不是確定的

val input: DataStream[T] = ...

// 設定了固定間隔的 event-time 會話視窗
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// 設定了動態間隔的 event-time 會話視窗
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // 決定並返回會話間隔
      }
    }))
    .<windowed transformation>(<window function>)

// 設定了固定間隔的 processing-time 會話視窗
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)


// 設定了動態間隔的 processing-time 會話視窗
input
    .keyBy(<key selector>)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // 決定並返回會話間隔
      }
    }))
    .<windowed transformation>(<window function>)

全域性視窗(Global Windows)

整個資料流是一個視窗,因為資料流是無界的,所以全域性視窗預設情況下,永遠不會觸發計算資料, 要定義trigger

val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>)

視窗函式

視窗函式主要分為兩種,一種是增量計算,如reduce和aggregate,一種是全量計算,如process。

  1. 增量計算指的是視窗儲存一份中間資料,每流入一個新元素,新元素與中間資料兩兩合一,生成新的中間資料,再儲存到視窗中

2.全量計算指的是視窗先快取該視窗所有元素,等到觸發條件後對視窗內的全量元素執行計算

ReduceFunction

ReduceFunction 指定兩條輸入資料如何合併起來產生一條輸出資料,輸入和輸出資料的型別必須相同。 Flink 使用 ReduceFunction 對視窗中的資料進行增量聚合。

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }

AggregateFunction

ReduceFunction 是 AggregateFunction 的特殊情況。 AggregateFunction 接收三個型別:輸入資料的型別(IN)、累加器的型別(ACC)和輸出資料的型別(OUT)。 輸入資料的型別是輸入流的元素型別,AggregateFunction 介面有如下幾個方法: 把每一條元素加進累加器、建立初始累加器、合併兩個累加器、從累加器中提取輸出(OUT 型別

class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)

  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2

  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate)

ProcessWindowFunction

ProcessWindowFunction 有能獲取包含視窗內所有元素的 Iterable, 以及用來獲取時間和狀態資訊的 Context 物件,比其他視窗函式更加靈活。 ProcessWindowFunction 的靈活性是以效能和資源消耗為代價的, 因為視窗中的資料無法被增量聚合,而需要在視窗觸發前快取所有資料。

val input: DataStream[(String, Long)] = ...

input
  .keyBy(_._1)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction())

/* ... */

class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {

  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}

增量聚合的 ProcessWindowFunction

ProcessWindowFunction 可以與 ReduceFunction 或 AggregateFunction 搭配使用,它就可以增量聚合視窗的元素並且從 ProcessWindowFunction` 中獲得視窗的元資料。


val input: DataStream[SensorReading] = ...

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .reduce(
    (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
    ( key: String,
      context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,
      minReadings: Iterable[SensorReading],
      out: Collector[(Long, SensorReading)] ) =>
      {
        val min = minReadings.iterator.next()
        out.collect((context.window.getStart, min))
      }
  )

Triggers

Trigger 決定了一個視窗(由 window assigner 定義)何時可以被 window function 處理

Trigger 介面提供了五個方法來響應不同的事件:

  • onElement() 方法在每個元素被加入視窗時呼叫。
  • onEventTime() 方法在註冊的 event-time timer 觸發時呼叫。
  • onProcessingTime() 方法在註冊的 processing-time timer 觸發時呼叫。
  • onMerge() 方法與有狀態的 trigger 相關。該方法會在兩個視窗合併時, 將視窗對應 trigger 的狀態進行合併,比如使用會話視窗時。
  • clear() 方法處理在對應視窗被移除時所需的邏輯。

Evictors

Flink 的視窗模型允許在 WindowAssigner 和 Trigger 之外指定可選的 Evictor。 如本文開篇的程式碼中所示,通過 evictor(...) 方法傳入 Evictor。 Evictor 可以在 trigger 觸發後、呼叫視窗函式之前或之後從視窗中刪除元素

Flink 內建有三個 evictor:

  • CountEvictor: 僅記錄使用者指定數量的元素,一旦視窗中的元素超過這個數量,多餘的元素會從視窗快取的開頭移除

  • DeltaEvictor: 接收 DeltaFunction 和 threshold 引數,計算最後一個元素與視窗快取中所有元素的差值, 並移除差值大於或等於 threshold 的元素。

  • TimeEvictor: 接收 interval 引數,以毫秒錶示。 它會找到視窗中元素的最大 timestamp max_ts 並移除比 max_ts - interval 小的所有元素。

預設情況下,所有內建的 evictor 邏輯都在呼叫視窗函式前執行。

Allowed Lateness

預設情況下,watermark 一旦越過視窗結束的 timestamp,遲到的資料就會被直接丟棄。 但是 Flink 允許指定視窗運算元最大的 allowed lateness。 Allowed lateness 定義了一個元素可以在遲到多長時間的情況下不被丟棄,這個引數預設是 0。 在 watermark 超過視窗末端、到達視窗末端加上 allowed lateness 之前的這段時間內到達的元素, 依舊會被加入視窗。取決於視窗的 trigger,一個遲到但沒有被丟棄的元素可能會再次觸發視窗,比如 EventTimeTrigger

val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .<windowed transformation>(<window function>)

關於狀態大小的考量

  1. Flink 會為一個元素在它所屬的每一個視窗中都建立一個副本
    ,設定一個大小為一天、滑動距離為一秒的滑動視窗可能不是個好想法

  2. educeFunction 和 AggregateFunction 可以極大地減少儲存需求,因為他們會就地聚合到達的元素, 且每個視窗僅儲存一個值。而使用 ProcessWindowFunction 需要累積視窗中所有的元素

  3. 使用 Evictor 可以避免預聚合, 因為視窗中的所有資料必須先經過 evictor 才能進行計算

Reference

  1. https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/operators/windows/