1. 程式人生 > >[Flink]Flink1.3 Stream指南三 視窗分配器

[Flink]Flink1.3 Stream指南三 視窗分配器

Windows(視窗)是處理無限資料流的核心。Windows將流分解成有限大小的"桶",在上面我們可以進行計算。本文件重點介紹如何在Flink中處理視窗,以及如何從它提供的功能中獲得最大的收益。

視窗Flink程式的一般結構如下。第一個片段指的是指定key的資料流,而第二個到未指定key的資料流。可以看出,唯一的區別是指定key的資料流呼叫keyBy()以及window()方法變為未指定key資料流下的windowAll()方法。

Keyed Windows:

stream
       .keyBy(...)          <-  keyed versus non-keyed windows
.window(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness()] <- optional, else zero .reduce/fold/apply() <- required: "function"

Non-Keyed Windows:

stream
       .windowAll(...)      <-  required: "assigner"
      [.trigger(...)]       <-  optional: "trigger" (else default trigger)
      [.evictor(...)]       <-  optional: "evictor" (else no evictor)
      [.allowedLateness()]  <-  optional, else zero
       .reduce/fold/apply() <-  required: "function"

在上面,方括號([...])中的命令是可選的。這表明Flink允許你可以以多種不同的方式自定義你的視窗邏輯,以便更好的滿足你的需求。

1. 視窗生命週期

簡而言之,一旦屬於此視窗的第一個元素到達,就會建立一個視窗,當時間(事件時間或處理時間)到達其結束時間加上使用者指定的允許延遲時間,視窗將被完全刪除(a window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified allowed lateness)。Flink保證僅對基於時間的視窗進行刪除,而不適用於其他型別的視窗,例如,全域性視窗。 舉個例子,使用基於事件時間的視窗策略,每5分鐘建立不重疊視窗,並且允許可以有1分鐘的延遲時間,當時間戳落在12:00至12:05時間間隔的第一個元素到達時,Flink將為這個時間間隔建立一個新視窗,當時間戳到達12:06時,視窗將被刪除(Flink will create a new window for the interval between 12:00 and 12:05 when the first element with a timestamp that falls into this interval arrives, and it will remove it when the watermark passes the 12:06 timestamp)。

另外,每個視窗都將有一個觸發器和一個函式(WindowFunction,ReduceFunction或FoldFunction)(參閱下面的Window函式)。該函式包含應用於視窗內容的計算,而觸發器指定了視窗使用該函式的條件,即指定了哪個視窗應用該函式。觸發策略可能是"當視窗中的元素個數大於4"時,或"當watermark通過視窗末尾"時。觸發器還可以決定在建立視窗和刪除視窗之間的任何時間清除視窗內容。在這種情況下,清除僅指視窗中的元素,而不是視窗元資料。這意味著新資料仍然可以新增到該視窗。

除了上述之外,你還可以指定一個Evictor(參閱下面的Evictors),在觸發器觸發之後以及在應用該函式之前和/或之後從視窗中移除元素。

下面我們將詳細介紹上述各個元件。我們從上面的程式碼片段(Keyed Windows,Non-Keyed Windows, Window Assigner以及Window Function)開始,然後再到可選部分。

2. Keyed vs Non-Keyed Windows

首先要指定的第一件事就是你的資料流是否應該指定key。這必須在定義視窗之前完成。使用keyBy()可以將無限資料流分解成指定key的邏輯資料流(split your infinite stream into logical keyed streams)。如果未呼叫keyBy(),則你的資料流未指定key。

在指定key的資料流的情況下,你傳入進來的事件的任何屬性都可以用作key(更多細節)。指定key的資料流可以允許通過多個任務並行執行視窗計算,因為每個邏輯資料流可以獨立於其餘的進行處理。引用相同key的所有元素將被髮送到相同的並行任務。

在未指定key的資料流的情況下,你的原始資料流不會被分割成多個邏輯資料流,並且所有視窗邏輯將由單個任務執行,即以1個的並行度執行。

3. 視窗分配器

在資料流是否指定key之後,下一步是定義視窗分配器。視窗分配器定義元素如何分配給視窗。這通過在window()(指定key資料流)或windowAll()(未指定key資料流)呼叫中指定你選擇的視窗分配器來完成。

視窗分配器負責將每個傳入元素分配給一個或多個視窗。Flink內建一些用於解決常見用例的視窗分配器,有滾動視窗,滑動視窗,會話視窗和全域性視窗。你還可以通過繼承WindowAssigner類實現自定義視窗分配器。所有內建視窗分配器(全域性視窗除外)根據時間將元素分配給視窗,可以是處理時間或事件時間。請參閱事件時間,瞭解處理時間和事件時間之間的差異以及如何生成時間戳和watermarks

在下文中,我們將展示Flink的內建視窗分配器的工作原理以及它們在DataStream程式中的使用方式。 以下圖形視覺化每個分配器的執行。 紫色圓圈表示資料流中的元素,它們被某些key(在我們這個例子下為使用者1,使用者2和使用者3)分割槽。x軸顯示時間進度。

3.1 滾動視窗

滾動視窗分配器將每個元素分配給固定視窗大小的視窗。滾動視窗大小固定的並且不重疊。例如,如果指定大小為5分鐘的滾動視窗,則將執行當前視窗,並且每五分鐘將啟動一個新視窗,如下圖所示:

以下程式碼顯示如何使用滾動視窗:

Java版本:

DataStream<T> input = ...;

// 基於事件事件的滾動視窗
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 基於處理時間的滾動視窗
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

Scala版本:

val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)// tumbling processing-time windowsinput    .keyBy(<key selector>)    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))    .<windowed transformation>(<window function>)// daily tumbling event-time windows offset by -8 hours.input    .keyBy(<key selector>)    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))    .<windowed transformation>(<window function>)

可以通過使用Time.milliseconds(x),Time.seconds(x),Time.minutes(x)其中一個來指定時間間隔。

如上一個例子所示,滾動視窗分配器還可以使用一個可選的偏移量引數,可以用來改變視窗的對齊方式。例如,沒有小時偏移量的情況下,滾動視窗與epoch對齊(aligned with epoch),那麼你將獲得如1:00:00.000 - 1:59:59.9992:00:00.000 - 2:59:59.999等視窗。如果你想改變,你可以給一個偏移。以15分鐘的偏移量為例,那麼你將獲得1:15:00.000 - 2:14:59.9992:15:00.000 - 3:14:59.999等。偏移量的一個重要用例是將視窗調整為時區而不是UTC-0。例如,在中國,你必須指定Time.hours(-8)的偏移量。

3.2 滑動視窗

滑動視窗分配器將每個元素分配給固定視窗大小的視窗。類似於滾動視窗分配器,視窗的大小由視窗大小引數配置。另外一個視窗滑動引數控制滑動視窗的啟動頻率(how frequently a sliding window is started)。因此,如果滑動大小小於視窗大小,滑動窗可以重疊。在這種情況下,元素被分配到多個視窗。

例如,你可以使用視窗大小為10分鐘的視窗,滑動大小為5分鐘。這樣,每5分鐘會生成一個視窗,包含最後10分鐘內到達的事件,如下圖所示。

以下程式碼顯示如何使用滑動視窗:

Java版本:

DataStream<T> input = ...;

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

Scala版本:

val input: DataStream[T] = ...

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)// sliding processing-time windowsinput    .keyBy(<key selector>)    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))    .<windowed transformation>(<window function>)// sliding processing-time windows offset by -8 hoursinput    .keyBy(<key selector>)    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))    .<windowed transformation>(<window function>)

可以通過使用Time.milliseconds(x),Time.seconds(x),Time.minutes(x)其中一個來指定時間間隔。

如上一個例子所示,滑動視窗分配器還可以使用一個可選的偏移量引數,可以用來改變視窗的對齊方式。例如,沒有小時偏移量並且滑動視窗大小為30分鐘的例子下,你將獲得如1:00:00.000 - 1:59:59.9991:30:00.000 - 2:29:59.999等視窗。如果你想改變,你可以給一個偏移。以15分鐘的偏移量為例,那麼你將獲得1:15:00.000 - 2:14:59.9991:45:00.000 - 2:44:59.999等。偏移量的一個重要用例是將視窗調整為時區而不是UTC-0。例如,在中國,你必須指定Time.hours(-8)的偏移量。

3.3 會話視窗

會話視窗分配器通過活動會話分組元素。與滾動視窗和滑動視窗相比,會話視窗不會重疊,也沒有固定的開始和結束時間。相反,當會話視窗在一段時間內沒有接收到元素時會關閉,例如,不活動的間隙時。會話視窗分配器配置會話間隙,定義所需的不活動時間長度(defines how long is the required period of inactivity)。當此時間段到期時,當前會話關閉,後續元素被分配到新的會話視窗。

以下程式碼顯示如何使用會話視窗:

Java版本:

DataStream<T> input = ...;

// event-time session windows
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);

// processing-time session windows
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);

Scala版本:

val input: DataStream[T] = ...

// event-time session windows
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// processing-time session windows
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

可以通過使用Time.milliseconds(x),Time.seconds(x),Time.minutes(x)其中一個來指定時間間隔。

由於會話視窗沒有固定的開始時間和結束時間,因此它們的執行與滾動視窗和滑動視窗不同。在內部,會話視窗運算元為每個到達記錄建立一個新視窗,如果它們之間的距離比定義的間隙更近,則將視窗合併在一起。為了可合併,會話視窗運算元需要一個合併觸發器和合並視窗函式,例如ReduceFunctionWindowFunctionFoldFunction無法合併)。

3.4 全域性視窗

全域性視窗分配器將具有相同key的所有元素分配給相同的單個全域性視窗。此視窗方案僅在你同時指定自定義觸發器時有用。否則,不會執行計算,因為全域性視窗不具有我們可以處理聚合元素的自然結束(the global window does not have a natural end at which we could process the aggregated elements.)。

以下程式碼顯示如何使用會話視窗:

Java版本:

DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>);

Scala版本:

val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>)

備註:

Flink版本:1.3