1. 程式人生 > 其它 >Stream Processing with Apache Flink中文版-- 第6章 基於時間和視窗的操作符

Stream Processing with Apache Flink中文版-- 第6章 基於時間和視窗的操作符

在本章中,我們將介紹用於時間處理和基於時間的操作符(如windows)的DataStream API方法。正如您在“時間語義”中學到的,Flink基於時間的操作符可以應用於不同的時間概念。

首先,我們將學習如何定義時間特徵、時間戳和水印。然後,我們將介紹流程功能、提供對時間戳和水印的訪問並可以註冊定時器的低階轉換。接下來,我們將使用Flink的視窗API,它提供了最常見視窗型別的內建實現。您還將瞭解自定義、使用者定義的視窗操作和核心視窗結構,如assigners, triggers和evictors.。最後,我們將討論如何按時加入流以及處理延遲事件的策略。

配置時間特徵

要在分散式流處理應用程式中定義時間操作,理解時間的含義是很重要的。當您指定一個視窗來在一分鐘的bucket中收集事件時,每個bucket具體包含哪些事件?在DataStream API中,您可以使用時間特徵來告訴Flink在建立視窗時如何定義時間。時間特徵是StreamExecutionEnvironment的一個屬性,它的取值範圍如下:

處理時間(ProcessingTime)

讓操作符根據執行它們的機器的系統時鐘來確定資料流的當前時間。處理時間視窗基於機器時間觸發,幷包含在此時間點之前到達操作符的所有元素。通常,使用視窗操作的處理時間會導致不確定的結果,因為視窗的內容取決於資料到達的速度。此設定提供非常低的延遲,因為處理任務不必等待水印(事件時間)時間。

事件時間(EventTime)

讓操作符通過使用來自資料本身的資訊來確定當前時間。每個事件都帶有一個時間戳,系統的邏輯時間由水印定義。正如您在“時間戳”章節中瞭解到的,時間戳要麼在進入資料處理管道之前就存在於資料中,要麼由源應用程式分配。當水印宣告某個時間間隔內的所有時間戳都已接收時,事件時間視窗將觸發。事件時間視窗計算確定性結果,即使事件發生順序混亂。視窗結果將不依賴於讀取或處理流的速度。

接入時間(IngestionTime)

將源操作符的處理時間指定為每個接入記錄的事件時間時間戳,並自動生成水印。它是EventTime和ProcessingTime的混合。事件的接入時間是它進入流處理器的時間。與事件時間相比,接入時間並沒有提供太多的實用價值,因為它不能提供確定的結果,並且具有與事件時間類似的效能。

示例6-1展示瞭如何通過重新訪問在“Hello, Flink!”中編寫的感測器流應用程式程式碼來設定時間特性。

//例6-1 將時間特性設定為事件時間object AverageSensorReadings {   // main() defines and executes the DataStream program
def main(args: Array[String]) { // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment // use event time for the application env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // ingest sensor stream val sensorData: DataStream[SensorReading] = env.addSource(...) }}

將時間特徵設定為EventTime可以進行時間戳和水印處理,因此可以進行事件時間操作。當然,如果您選擇EventTime時間特徵,仍然可以使用處理時間視窗和定時器。

如果使用處理時間,使用TimeCharacteristic.ProcessingTime替換TimeCharacteristic.EventTime。

分配時間戳和生成水印

正如在“事件時間處理”章節中所討論的,您的應用程式需要提供兩條重要的資訊給Flink,以便在事件時間中進行操作。每個事件必須與一個時間戳相關聯,該時間戳通常指示事件實際發生的時間。事件時間流還需要攜帶水印,操作符可以從中推斷當前事件時間。

時間戳和水印從1970-01- 01t00:00:00開始以毫秒為單位指定。水印告訴操作符,不希望出現時間戳小於或等於水印的事件。時間戳和水印可以由SourceFunction分配和生成,也可以使用使用者定義的顯式時間戳生成器和水印生成器。在“源函式、時間戳和水印”一節中討論了在SourceFunction中分配時間戳和生成水印。在這裡,我們將解釋如何使用使用者定義的函式來實現這一點。

                   覆蓋源生成的時間戳和水印
如果使用時間戳分配程式,則任何現有的時間戳和水印都將被覆蓋。

DataStream API提供了TimestampAssigner介面,以便在元素被接入到流應用程式後從元素中提取時間戳。通常,時間戳分配程式是在源函式之後立即呼叫的,因為大多數分配程式在生成水印時都對元素的時間戳順序進行了假設。由於元素通常是並行處理的,因此任何引起Flink跨並行流分割槽重新分配元素的操作(如並行性更改、keyBy()或其他顯式重新分配)都會打亂元素的時間戳順序。

最好的做法是分配時間戳,並在儘可能靠近源的地方甚至在SourceFunction內生成水印。根據應用場景,在分配時間戳之前,如果這些操作沒有引起元素的重新分配,可以對輸入流應用初始過濾或轉換。

為了確保事件時間操作的行為符合預期,應該在任何事件時間相關的轉換之前(例如,在第一個事件時間視窗之前)呼叫分配程式。

時間戳分配程式的行為類似於其他轉換操作符。它們在一個數據流上呼叫,併產生一個新的帶有時間戳的資料流和水印。時間戳分配程式不會更改DataStream的資料型別。

示例6-2中的程式碼展示瞭如何使用時間戳分配器。在本例中,讀取流之後,我們首先應用一個過濾器轉換,然後呼叫assignTimestampsAndWatermarks()方法,在這個方法中我們定義了時間戳分配器MyAssigner()。

//例6-2 使用時間戳分配程式val env = StreamExecutionEnvironment.getExecutionEnvironment// set the event time characteristicenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// ingest sensor streamval readings: DataStream[SensorReading] = env.addSource(new SensorSource)// assign timestamps and generate watermarks.assignTimestampsAndWatermarks(new MyAssigner())

在上面的例子中,MyAssigner的型別可以是AssignerWithPeriodicWatermarks 或 AssignerWithPunctuatedWatermarks.型別。這兩個介面擴充套件了DataStream API提供的TimestampAssigner介面。第一個介面定義週期發出水印的分配器,而第二個介面根據輸入事件的屬性注入水印。接下來我們將詳細描述這兩個介面。

週期性水印分配器

週期分配水印意味著我們讓系統發出水印,並在固定的機器時間間隔內提前事件時間。預設的interval設定為200毫秒,但是可以使用ExecutionConfig.setAutoWatermarkInterval()方法來配置它:

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// generate watermarks every 5 secondsenv.getConfig.setAutoWatermarkInterval(5000)

在前面的示例中,程式每5秒發出一次水印。實際上,每隔5秒,Flink就會呼叫AssignerWithPeriodicWatermarks的getCurrentWatermark()方法。如果方法返回的非空值的時間戳大於前一個水印的時間戳,則轉發新水印。這種檢查是必要的,以確保事件時間不斷增加;否則不會產生水印。

示例6-3顯示了一個具有周期性時間戳的assigner程式,它通過跟蹤到目前為止所看到的最大元素時間戳來生成水印。當請求新的水印時,assigner返回一個最大時間戳減去1分鐘容忍間隔的水印。

class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {   val bound: Long = 60 * 1000 // 1 min in ms   var maxTs: Long = Long.MinValue // the maximum observed   timestamp   override def getCurrentWatermark: Watermark = {       // generated watermark with 1 min tolerance       new Watermark(maxTs - bound)  }   override def extractTimestamp(r: SensorReading,previousTS: Long): Long = {       // update maximum timestamp       maxTs = maxTs.max(r.timestamp)       // return record timestamp       r.timestamp  }}

DataStream API為具有周期性水印的時間戳分配程式(assigners)的兩種常見情況提供了實現。如果您的輸入元素具有單調遞增的時間戳,則可以使用快捷方法assignAscendingTimeStamps。此方法使用當前時間戳來生成水印,因為不能出現更早的時間戳。下面演示如何為升序時間戳生成水印:

val stream: DataStream[SensorReading] = ...val withTimestampsAndWatermarks = stream.assignAscendingTimestamps(e => e.timestamp)

週期水印生成的另一種常見情況是,當您知道輸入流中的最大延遲時——元素的時間戳與所有已匯入元素的最大時間戳之間的最大差異。對於這種情況,Flink提供了BoundedOutOfOrdernessTimeStampExtractor,它將最大預期延遲作為引數:

val stream: DataStream[SensorReading] = ...valoutput=stream.assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(10))(e=>.timestamp)

在前面的程式碼中,允許元素延遲10秒。這意味著,如果一個元素的事件時間與之前所有元素的最大時間戳之間的差距大於10秒,那麼該元素可能在完成相應的計算併發出結果之後到達進行處理。Flink提供了處理這些延遲事件的不同策略,我們將在“處理延遲資料”中討論這些策略。

PUNCTUATED水印分配器

有時,輸入流包含特殊的元組或標記來指示流的進度。對於這種情況,或者當可以根據輸入元素的其他屬性定義水印時,Flink提供了AssignerWithPunctuatedWatermarks介面。它定義了checkAndGetNextWatermark()方法,該方法在extractTimestamp()之後為每個事件呼叫。該方法可以決定是否生成新的水印。如果方法返回的非空水印大於最新發出的水印,則發出新水印。

示例6-4顯示了一個punctuated水印分配程式(assigner),它為從ID為“sensor_1”的感測器接收到的每個讀數發出水印。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {   val bound: Long = 60 * 1000 // 1 min in ms   override def checkAndGetNextWatermark(r: SensorReading,extractedTS: Long): Watermark = {       if (r.id == "sensor_1") {           // emit watermark if reading is from sensor_1           new Watermark(extractedTS - bound)      } else {           // do not emit a watermark           null      }  }   override def extractTimestamp(r: SensorReading,previousTS: Long): Long = {       // assign record timestamp       r.timestamp  }}

水印、延遲和完整性

到目前為止,我們已經討論瞭如何使用TimestampAssigner生成水印。我們還沒有討論的是水印對您的流應用程式的影響。

水印用於平衡延遲和結果完整性。它們控制在執行計算之前等待資料到達的時間,例如完成視窗計算並輸出結果。基於事件時間的操作符使用水印來確定其讀取記錄的完整性和操作的進度。根據接收到的水印,操作符計算一個時間點,直到它接收到這個時間點前的相關輸入記錄為止。

然而,現實是我們永遠不可能有完美的水印,因為這意味著我們總是可以確定,沒有延遲的記錄。在實踐中,您需要進行有根據的猜測,並使用啟發式方法在應用程式中生成水印。您需要使用關於源、網路和分割槽的所有資訊來估計處理進度和輸入記錄延遲的上限。估計意味著有出錯的空間,在這種情況下,您可能會生成不準確的水印,導致資料延遲或應用程式延遲的不必要增加。記住這一點,您可以使用水印來平衡結果延遲和結果完整性。

如果生成鬆散的水印(水印遠遠落後於處理過的記錄的時間戳),則會增加生成結果的延遲。此時你可以生成一個結果更早的水印。此外,狀態的大小通常會增加,因為應用程式需要緩衝更多的資料,直到可以執行計算為止。然而,在執行計算時,您可以相當肯定所有相關資料都是可用的。

另一方面,如果您生成了非常緊密的水印—這些水印可能比一些後續記錄的時間戳更大—基於時間的計算可能在所有相關資料到達之前執行。雖然這可能會產生不完整或不準確的結果,但結果會及時生成,延遲也較低。

與基於所有資料都可用的前提條件下,構建的批處理應用程式不同,延遲/完整性權衡是流處理應用程式的基本特徵,流處理應用程式處理的是接收到的無界資料。水印是一種功能強大的方法,可以根據時間控制應用程式的行為。除了水印之外,Flink還有許多特性來調整基於時間的操作的確切行為,如process函式和視窗觸發器,並提供了處理延遲資料的不同方法,這些方法將在“處理延遲資料”中討論。

Process 函式

儘管時間資訊和水印對於許多流處理應用程式非常重要,但是您可能已經注意到,我們無法通過目前為止看到的基本DataStream API轉換來使用它們。例如,MapFunction不能訪問時間戳或當前事件時間。

DataStream API提供了一系列低層轉換,即process函式,這些函式可以訪問記錄的時間戳和水印,並註冊將來某個特定時間觸發的定時器。此外,process函式還支援將記錄傳送到多個輸出流。process函式通常用於構建事件驅動的應用程式,並實現可能不適用於預定義視窗和轉換的自定義邏輯。例如,Flink的SQL支援的大多數操作符都是使用process函式實現的。

目前,Flink提供八種不同的process函式:ProcessFunction、KeyedProcessFunction、CoProcessFunction、ProcessJoinFunction、BroadcastProcessFunction、KeyedBroadcastProcessFunction、ProcessWindowFunction和ProcessAllWindowFunction。正如名稱所示,這些函式適用於不同的上下文中。但是,它們具有非常相似的特性。我們將通過詳細討論KeyedProcessFunction來繼續討論這些常見特性。

KeyedProcessFunction是一個非常通用的函式,可以應用於KeyedStream。對流的每個記錄呼叫該函式,並返回零條、一條或多條記錄。所有process函式都實現RichFunction介面,提供open()、close()和getRuntimeContext()方法。另外,KeyedProcessFunction[KEY, IN, OUT]還提供了以下兩個方法:

  1. processElement(v: IN, ctx: Context, out:Collector[out])為流的每個記錄呼叫。通常,通過將結果記錄傳遞給收集器(out:Collector)的方式來輸出結果記錄。上下文物件使process函式變得特殊。它提供了對時間戳和當前記錄的key以及對TimerService的訪問。此外,上下文可以將記錄傳送到side output。

  2. onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[out])是一個回撥函式,它在已註冊的定時器觸發時呼叫。timestamp引數給出觸發定時器的時間戳,收集器(out:Collector)可以將記錄輸出。OnTimerContext提供與processElement()方法的上下文物件相同的服務,並返回觸發觸發器的時間域(處理時間或事件時間)。

TimerService和Timers

Context和OnTimerContext物件的TimerService提供了以下方法:

  • currentProcessingTime(): Long 返回當前處理時間。

  • currentWatermark(): Long 返回當前水印的時間戳。

  • registerProcessingTimeTimer(timestamp:Long):Unit 為當前key註冊一個處理時間定時器。當所在執行伺服器的處理時間達到所提供的時間戳時,定時器將觸發。

  • registerEventTimeTimer(timestamp: Long):Unit 為當前key註冊一個事件時間定時器。當將水印更新為與定時器的時間戳相等或更大的時間戳時,定時器將觸發。

  • deleteProcessingTimeTimer(timestamp:Long): Unit 刪除先前為當前key註冊的處理時間定時器。如果不存在這樣的定時器,則該方法無效。

  • deleteEventTimeTimer(timestamp:Long):Unit 刪除先前為當前key註冊的事件時間定時器。如果不存在這樣的定時器,則該方法無效。

當定時器觸發時,將呼叫onTimer()回撥函式。processElement()和onTimer()方法是同步的,以防止對狀態的併發訪問和操作。

                  非key型別流上的Timer
定時器(Timer)只能在key型別流上註冊。定時器(Timer)的一個常見用例是在一段時間不活動之後清除keu狀態,或者實現基於時間的自定義視窗邏輯。要在非key型別流上使用定時器,可以使用帶有常量虛擬鍵的KeySelector來建立key型別流。注意,這將把所有資料移動到單個任務中,這樣操作符將以並行度1有效地執行。

對於每個key和時間戳,可以只註冊一個定時器(Timer),這意味著每個key可以有多個定時器(Timers),但每個時間戳只能有一個定時器。預設情況下,KeyedProcessFunction具有heap上的優先順序佇列中,所有定時器(Timers)的時間戳。但是,您可以配置RocksDB狀態後端來儲存定時器(Timer)。

定時器與函式的任何其他狀態一起被存入檢查點。如果應用程式需要從故障中恢復,則在應用程式重新啟動時過期的所有處理時間定時器將在應用程式恢復時立即觸發。對於儲存在儲存點(savepoint)中的處理時間定時器也是如此。定時器總是非同步存入檢查點,只有一個例外。如果您使用帶有增量檢查點的RocksDB狀態後端,並將定時器儲存在heap上(預設設定),則它們將同步進行存入檢查點。在這種情況下,建議不要過度使用定時器,以避免存入檢查點的過程耗時太長。

注意
為過去的時間戳註冊的定時器不會自動刪除,但也會被處理。處理時間定時器在註冊方法返回後立即觸發。事件時間定時器在處理下一個水印時觸發。

下面的程式碼展示瞭如何將KeyedProcessFunction應用於KeyedStream。該函式監視感測器的溫度,如果感測器的溫度在處理時間內1秒出現了上升,則發出警告:

val warnings = readings// key by sensor id.keyBy(_.id)// apply ProcessFunction to monitor temperatures.process(new TempIncreaseAlertFunction)

TempIncreaseAlertFunction的實現如例6-5所示。

/** Emits a warning if the temperature of a sensor* monotonically increases for 1 second (in processing time).*/class TempIncreaseAlertFunction extends KeyedProcessFunction[String,SensorReading,String]{   // stores temperature of last sensor reading   lazy val lastTemp: ValueState[Double]=getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp",Types.of[Double]))   // stores timestamp of currently active timer   lazy val currentTimer: ValueState[Long] =   getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer", Types.of[Long]))   override def processElement(r: SensorReading,ctx: KeyedProcessFunction[String,SensorReading,String]#Context,out: Collector[String]): Unit = {       // get previous temperature       val prevTemp = lastTemp.value()       // update last temperature       lastTemp.update(r.temperature)       val curTimerTimestamp = currentTimer.value();       if (prevTemp == 0.0 || r.temperature < prevTemp) {           // temperature decreased; delete current timer           ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp)           currentTimer.clear()      } else if (r.temperature > prevTemp && curTimerTimestamp == 0) {           // temperature increased and we have not set a timer yet           // set processing time timer for now + 1 second           val timerTs = ctx.timerService().currentProcessingTime()+ 1000           ctx.timerService().registerProcessingTimeTimer(timerTs)           // remember current timer           currentTimer.update(timerTs)      }  }   override def onTimer(       ts: Long,       ctx: KeyedProcessFunction[String, SensorReading,       String]#OnTimerContext,       out: Collector[String]): Unit = {       out.collect("Temperature of sensor '" + ctx.getCurrentKey +       "' monotonically increased for 1 second.")       currentTimer.clear()  }}

輸出到Side Output(側輸出)

DataStream API的大多數操作符只有一個輸出—它們產生一個具有特定資料型別的結果流。只有split操作符允許將一個流拆分為多個相同型別的流。側輸出是處理函式的一個特性,用於從可能具有不同型別的函式輸出多個流。側輸出由OutputTag[X]物件標識,其中X是結果側輸出流的型別。處理函式可以通過上下文物件將一條記錄傳送到一個或多個側輸出發。

示例6-6展示瞭如何通過側輸出的DataStream的ProcessFunction發出資料。

val monitoredReadings: DataStream[SensorReading] = readings// monitor stream for readings with freezing temperatures.process(new FreezingMonitor)// retrieve and print the freezing alarms side outputmonitoredReadings.getSideOutput(new OutputTag[String]("freezing-alarms")).print()// print the main outputreadings.print()

示例6-7顯示了FreezingMonitor函式,該函式監視感測器讀數流,並向溫度低於32°F的側輸出發出警告。

/** Emits freezing alarms to a side output for readings* with a temperature below 32F. */class FreezingMonitor extends ProcessFunction[SensorReading,SensorReading] { // define a side output tag lazy val freezingAlarmOutput:OutputTag[String]=new OutputTag[String]("freezing-alarms") override def processElement(r: SensorReading,                             ctx: ProcessFunction[SensorReading,SensorReading]#Context,    out: Collector[SensorReading]): Unit = {       // emit freezing alarm if temperature is below 32F       if (r.temperature < 32.0) {      ctx.output(freezingAlarmOutput, s"Freezing Alarm for ${r.id}")      }       // forward all readings to the regular output       out.collect(r)  }}

CoProcessFunction

對於兩個輸入流的低層操作,DataStream API還提供了CoProcessFunction。與CoFlatMapFunction類似,CoProcessFunction為每個輸入processElement1()和processElement2()提供轉換方法。與ProcessFunction類似,這兩個方法都是使用上下文物件呼叫的,該上下文物件允許訪問元素或定時器時間戳、TimerService和側輸出。CoProcessFunction還提供了一個onTimer()回撥方法。示例6-8展示瞭如何應用CoProcessFunction來合併兩個流。

// ingest sensor streamval sensorData: DataStream[SensorReading] = ...// filter switches enable forwarding of readingsval filterSwitches: DataStream[(String, Long)] = env  .fromCollection(Seq(  ("sensor_2", 10 * 1000L), // forward sensor_2 for 10   seconds  ("sensor_7", 60 * 1000L)) // forward sensor_7 for 1 minute  )val forwardedReadings = readings   // connect readings and switches  .connect(filterSwitches)   // key by sensor ids  .keyBy(_.id, _._1)   // apply filtering CoProcessFunction  .process(new ReadingFilter)

示例6-9中顯示了一個ReadingFilter函式的實現,該函式根據過濾器開關流動態過濾感測器讀數流。

class ReadingFilter extends CoProcessFunction[SensorReading,(String, Long),SensorReading] {   // switch to enable forwarding   lazy val forwardingEnabled: ValueState[Boolean] =   getRuntimeContext.getState(   new ValueStateDescriptor[Boolean]("filterSwitch",Types.of[Boolean]))   // hold timestamp of currently active disable timer   lazy val disableTimer: ValueState[Long] =   getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer", Types.of[Long]))   override def processElement1(       reading: SensorReading,       ctx: CoProcessFunction[SensorReading, (String, Long),       SensorReading]#Context,       out: Collector[SensorReading]): Unit =   {       // check if we may forward the reading       if (forwardingEnabled.value()) {      out.collect(reading)      }  }   override def processElement2(       switch: (String, Long),       ctx: CoProcessFunction[SensorReading, (String, Long),SensorReading]#Context,       out: Collector[SensorReading]): Unit =   {       // enable reading forwarding       forwardingEnabled.update(true)       // set disable forward timer       val timerTimestamp =ctx.timerService().currentProcessingTime() + switch._2       val curTimerTimestamp = disableTimer.value()       if (timerTimestamp > curTimerTimestamp) {       // remove current timer and register new timer       ctx.timerService().deleteEventTimeTimer(curTimerTimestamp)       ctx.timerService().registerProcessingTimeTimer(timerTimestamp)       disableTimer.update(timerTimestamp)      }  }   override def onTimer(     ts:Long,     ctx:CoProcessFunction[SensorReading,(String, Long),SensorReading]#OnTimerContext,     out:Collector[SensorReading]): Unit = {       // remove all state; forward switch will be false by default       forwardingEnabled.clear()       disableTimer.clear()  }}

視窗操作

視窗操作是流處理應用程式中的常見操作。它們支援在無界流的有限間隔上進行諸如聚合類的轉換。通常,這些間隔是使用基於時間的邏輯定義的。視窗操作符提供了一種方法來將事件分組到有限大小的buckets(桶)中,並對這些桶中的事件資料應用計算。例如,視窗操作符可以將流的事件分組到5分鐘的視窗中,並計算每個視窗已經接收了多少事件資料。

DataStream API為最常見的視窗操作提供了內建方法,並提供了非常靈活的視窗機制來自定義視窗邏輯。在本節中,我們將向您展示如何定義視窗操作符,展示DataStream API的內建視窗型別,討論可以應用於視窗的函式,最後解釋如何定義自定義視窗邏輯。

定義視窗操作

視窗操作符可以應用於key型別流或非key型別流。key型別視窗上的視窗操作符是平行計算的,而非key型別視窗是在一個執行緒中處理的。

要建立視窗操作符,您需要指定兩個視窗元件:

  1. 確定輸入流的元素如何分組到視窗中的視窗分配程式。視窗分配程式生成一個WindowedStream(如果應用於非key型別資料流,則生成AllWindowedStream)。

  2. 應用於WindowedStream(或AllWindowedStream)上,並處理分配給視窗的元素的視窗函式。

下面的程式碼演示瞭如何指定一個視窗分配程式和一個視窗函式的key型別流或非key型別流:

stream.keyBy(...).window(...) // specify the window assigner.reduce/aggregate/process(...) // specify the window function
// define a nonkeyed window-all operatorstream.windowAll(...) // specify the window assigner.reduce/aggregate/process(...) // specify the window function

在本章的其餘部分,我們只關注key型別視窗。非key型別視窗(在DataStream API中也稱為all-windows)的行為完全相同,只是它們收集所有資料並且不併行計算。

注意
請注意,可以通過提供自定義觸發器或收回器並宣告處理延遲元素的策略來自定義視窗操作符。本節後面將詳細討論自定義視窗操作符。

內建的視窗分配程式

Flink為最常見的視窗場景提供了內建的視窗分配程式。我們在這裡討論的所有分配程式都是基於時間的,並已經在“資料流上的操作”中做了介紹。基於時間的視窗分配程式根據元素的事件時間時間戳或當前處理時間向視窗分配元素。時間視窗有一個開始時間戳和一個結束時間戳。

所有內建的視窗分配程式都提供一個預設觸發器,一旦(處理或事件)時間過了視窗的末端,就會觸發視窗計算。需要注意的是,當第一個元素被分配給一個視窗時,視窗就被建立了。Flink永遠不會計算空視窗。

基於計數器的視窗
除了基於時間的視窗之外,Flink還支援基於計數器的視窗——將固定數量的元素按它們到達視窗操作符的順序分組的視窗。由於它們依賴於攝入順序,基於計數的視窗是不確定的。此外,如果在使用時沒有自定義觸發器(在某些情況下會丟棄不完整和陳舊的視窗),則會導致問題。

Flink的內建視窗分配程式建立型別為TimeWindow的視窗。此視窗型別實質上表示兩個時間戳之間的時間間隔,其中start包含開始時間點,end不包含結束時間點。此型別視窗包括檢索視窗邊界、檢查視窗是否相交以及合併重疊視窗的方法。

下面,我們將展示DataStream API的不同內建視窗分配程式,以及如何使用它們來定義視窗操作符。

TUMBLING 視窗

滾動視窗(TUMBLING 視窗)分配程式將元素放置到不重疊的、固定大小的視窗中,如圖6-1所示。

Datastream API為Tumbling事件時間視窗和處理時間視窗分別提供了兩個分配器—TumblingEventTimeWindowsTumblingProcessingTimeWindows。Tumbling視窗分配程式接收一個引數,視窗大小以時間為單位;這可以使用分配者的of(Time size)方法來指定。時間間隔可以設定為毫秒、秒、分鐘、小時或天。

下面的程式碼展示瞭如何定義事件時間和處理時間tumbling視窗的感測器資料測量流:

val sensorData: DataStream[SensorReading] = ...val avgTemp = sensorData  .keyBy(_.id)   // group readings in 1s event-time windows  .window(TumblingEventTimeWindows.of(Time.seconds(1)))  .process(new TemperatureAverager)val avgTemp = sensorData  .keyBy(_.id)   // group readings in 1s processing-time windows  .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))  .process(new TemperatureAverager)

在我們的第一個DataStream API示例,“資料流上的操作”章節中,視窗定義看起來有點不同。在那裡,我們使用timeWindow(size)方法定義了一個事件時間tumbling視窗,這是window.(TumblingEventTimeWindows.of(size)) 或者 window.(TumblingProcessingTimeWindows.of(size))兩個視窗定義的快捷方式,具體取決於配置的時間特性。下面的程式碼演示瞭如何使用這個快捷方式:

val avgTemp = sensorData.keyBy(_.id)// window.(TumblingEventTimeWindows.of(size))的快捷方式.timeWindow(Time.seconds(1)).process(new TemperatureAverager)

預設情況下,tumbling視窗與epoch時間對齊,1970-01-01-00:00:00.000。例如,大小為1小時的分配程式將在00:00:00、01:00:00、02:00:00等時間定義視窗。或者,您可以指定偏移量作為分配程式中的第二個引數。下面的程式碼顯示了偏移量為15分鐘的視窗,偏移量分別從00:15:00、01:15:00、02:15:00開始,依次類推:

val avgTemp = sensorData.keyBy(_.id)// group readings in 1 hour windows with 15 min offset.window(TumblingEventTimeWindows.of(Time.hours(1),Time.minutes(15))).process(new TemperatureAverager)
SLIDING WINDOWS(滑動視窗)

滑動視窗分配程式將元素分配給固定大小的視窗,這些視窗按指定的滑動間隔移動,如圖6-2所示。

對於滑動視窗,必須指定視窗大小和滑動間隔,以定義新視窗的啟動頻率。當滑動間隔小於視窗大小時,視窗重疊,可以將元素分配給多個視窗。如果滑動間隔比視窗大小大,一些元素可能不會被分配到任何視窗,因此可能被丟棄。

下面的程式碼展示瞭如何將感測器讀數分組到1小時大小的滑動視窗中,滑動間隔為15分鐘。每個讀數將被新增到四個視窗。DataStream API提供了事件時間和處理時間分配器,以及快捷方法,可以將時間間隔偏移量設定為視窗分配器的第三個引數:

// event-time sliding windows assignerval slidingAvgTemp = sensorData  .keyBy(_.id)   // create 1h event-time windows every 15 minutes  .window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(15)))  .process(new TemperatureAverager)// processing-time sliding windows assignerval slidingAvgTemp = sensorData  .keyBy(_.id)   // create 1h processing-time windows every 15 minutes  .window(SlidingProcessingTimeWindows.of(Time.hours(1),Time.minutes(15)))  .process(new TemperatureAverager)// sliding windows assigner using a shortcut methodval slidingAvgTemp = sensorData  .keyBy(_.id)   // shortcut for window.(SlidingEventTimeWindow.of(size,slide))  .timeWindow(Time.hours(1), Time(minutes(15)))  .process(new TemperatureAverager)
SESSION WINDOWS(會話視窗)

會話視窗分配程式將元素放入大小不同的活動的非重疊視窗中。會話視窗的邊界由不活動的間隔定義,在這些間隔中沒有接收到任何記錄。圖6-3說明了如何將元素分配給會話視窗。

以下示例演示如何將感測器讀數分組到會話視窗,其中每個會話都定義為15分鐘的不活動時間:

// event-time session windows assignerval sessionWindows = sensorData  .keyBy(_.id)   // create event-time session windows with a 15 min gap  .window(EventTimeSessionWindows.withGap(Time.minutes(15)))  .process(...)// processing-time session windows assignerval sessionWindows = sensorData  .keyBy(_.id)   // create processing-time session windows with a 15 min gap  .window(ProcessingTimeSessionWindows.withGap(Time.minutes(15)))  .process(...)

由於會話視窗的開始和結束取決於接收到的元素,所以視窗分配程式不能立即將所有元素分配到正確的視窗。相反,SessionWindows assigner最初將每個傳入元素對映到它自己的視窗中,以元素的時間戳作為開始時間,會話間隔作為視窗大小。隨後,它用重疊的範圍合併所有視窗。

在視窗上應用函式

視窗函式定義了對視窗中的資料元素執行的計算。有兩種型別的函式可以用於視窗函式:

  1. 增量聚合函式(Incremental aggregation functions):在元素被新增到視窗並保持和更新單個值為視窗狀態時直接應用增量聚合函式。這些函式通常非常節省空間,並最終產生聚合值作為結果。ReduceFunction和AggregateFunction都是增量聚合函式。

  2. 完整視窗函式(Full window functions):收集視窗的所有元素,並在對所有收集的元素求值時遍歷元素列表。完整視窗函式通常需要更多的空間,但是允許比增量聚合函式更復雜的邏輯。ProcessWindowFunction是一個完整視窗函式。

在本節中,我們將討論可以應用於視窗的不同型別的函式,以便對視窗的內容執行聚合或任意計算。我們還展示瞭如何在視窗操作符中聯合應用增量聚合函式和完整視窗函式。

REDUCEFUNCTION

在討論在key型別流上執行聚合時,在“KeyedStream轉換”中引入了ReduceFunction。ReduceFunction接受相同型別的兩個值,並將它們組合成相同型別的單個值。當應用於有視窗的流時,ReduceFunction增量地聚合分配給視窗的元素。視窗只儲存聚合的當前結果—ReduceFunction的輸入(和輸出)型別的單個值。當接收到新元素時,使用新元素和從視窗狀態讀取的當前值呼叫ReduceFunction。視窗的狀態由ReduceFunction的結果代替。

在視窗上應用ReduceFunction的優點是每個視窗的狀態大小恆定且佔用空間小,而且函式介面簡單。然而,ReduceFunction的應用程式是有限的,而且通常侷限於簡單的聚合,因為輸入和輸出型別必須是相同的。

示例6-10顯示了一個reduce lambda函式,該函式每15秒計算每個感測器的最小溫度。

val minTempPerWindow: DataStream[(String, Double)] = sensorData  .map(r => (r.id, r.temperature))  .keyBy(_._1)  .timeWindow(Time.seconds(15))  .reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
AGGREGATEFUNCTION

與ReduceFunction類似,AggregateFunction也增量地應用於應用於視窗的元素。此外,具有AggregateFunction的視窗操作符的狀態也由一個值組成。

雖然AggregateFunction的介面更加靈活,但是與ReduceFunction的介面相比,它的實現也更加複雜。下面的程式碼展示了AggregateFunction的介面:

public interface AggregateFunction<IN, ACC, OUT> extends  Function, Serializable {   // create a new accumulator to start a new aggregate.   ACC createAccumulator();   // add an input element to the accumulator and return the accumulator.   ACC add(IN value, ACC accumulator);   // compute the result from the accumulator and return it.   OUT getResult(ACC accumulator);   // merge two accumulators and return the result.   ACC merge(ACC a, ACC b);}

該介面定義輸入型別IN、一個型別為ACC的累加器和結果型別OUT。與ReduceFunction相反,中間資料型別和輸出型別不依賴於輸入型別。

示例6-11展示瞭如何使用聚合函式來計算每個視窗的感測器讀數的平均溫度。累加器維護一個執行的和,和一個計數,getResult()方法計算平均值。

val avgTempPerWindow: DataStream[(String, Double)] = sensorData.map(r => (r.id, r.temperature)).keyBy(_._1).timeWindow(Time.seconds(15)).aggregate(new AvgTempFunction)// An AggregateFunction to compute the average tempeature per sensor.// The accumulator holds the sum of temperatures and an event count.class AvgTempFunction extends AggregateFunction[(String, Double), (String, Double, Int), (String, Double)] {   override def createAccumulator() = {  ("", 0.0, 0)  }   override def add(in: (String, Double), acc: (String, Double,Int)) = {  (in._1, in._2 + acc._2, 1 + acc._3)  }   override def getResult(acc: (String, Double, Int)) = {  (acc._1, acc._2 / acc._3)  }   override def merge(acc1: (String, Double, Int), acc2:(String, Double, Int)) = {  (acc1._1, acc1._2 + acc2._2, acc1._3 + acc2._3)  }}
PROCESSWINDOWFUNCTION

ReduceFunction和AggregateFunction增量地應用於分配給視窗的事件。然而,有時我們需要訪問視窗的所有元素來執行更復雜的計算,例如計算視窗中值的中位數或最頻繁出現的值。對於這樣的應用程式,ReduceFunction和AggregateFunction都不合適。Flink的DataStream API提供了ProcessWindowFunction來對視窗的內容執行任意計算。

注意
Flink 1.7的DataStream API具有WindowFunction介面。WindowFunction已經被ProcessWindowFunction所取代,這裡不再討論它。

下面的程式碼顯示了ProcessWindowFunction的介面:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>extends AbstractRichFunction {   // Evaluates the window void process(KEY key,Context ctx,Iterable<IN> vals,Collector<OUT> out)throws Exception;   // Deletes any custom per-window state when the window is purged   public void clear(Context ctx) throws Exception {}   // The context holding window metadata   public abstract class Context implements Serializable {       // Returns the metadata of the window       public abstract W window();       // Returns the current processing time       public abstract long currentProcessingTime();       // Returns the current event-time watermark       public abstract long currentWatermark();       // State accessor for per-window state       public abstract KeyedStateStore windowState();       // State accessor for per-key global state       public abstract KeyedStateStore globalState();       // Emits a record to the side output identified by the OutputTag.       public abstract <X> void output(OutputTag<X> outputTag, X value);  }}

process()方法中使用了視窗key,使用迭代器訪問視窗的元素,使用收集器輸出結果。此外,該方法具有與其他處理方法類似的上下文引數。ProcessWindowFunction的上下文物件允許訪問視窗的元資料、當前處理時間和水印、用於管理每個視窗和每個key的全域性狀態的狀態儲存,以及用於輸出記錄的側輸出。

在介紹流程函式時,我們已經討論了上下文物件的一些特性,比如對當前處理的訪問以及事件時間和側輸出。然而,ProcessWindowFunction的上下文物件也提供了獨特的特性。視窗的元資料通常包含可以用作視窗識別符號的資訊,例如時間視窗的開始和結束時間戳。

另一個特性是每個視窗和每個key的全域性狀態。全域性狀態是指不侷限於任何視窗的key狀態,而perwindow狀態是指當前正在計算的視窗例項。每個視窗的狀態對於維護應該在同一視窗上的process()方法的多個呼叫之間共享的資訊是很有用的,這些呼叫可能由於配置允許的延遲或使用自定義觸發器而發生。利用每個視窗狀態的ProcessWindowFunction需要實現它的clear()方法來在清除視窗之前清除任何特定於視窗的狀態。全域性狀態可用於在同一key上的多個視窗之間共享資訊。

示例6-12將感測器讀取流分組為5秒的tumbling視窗,並使用ProcessWindowFunction計算視窗內發生的最低和最高溫度。它為每個視窗輸出一條記錄,包括視窗的開始和結束時間戳以及最小和最大溫度。

// output the lowest and highest temperature reading every 5 secondsval minMaxTempPerWindow: DataStream[MinMaxTemp] = sensorData  .keyBy(_.id)  .timeWindow(Time.seconds(5))  .process(new HighAndLowTempProcessFunction)case class MinMaxTemp(id: String, min: Double, max:Double,endTs: Long)/*** A ProcessWindowFunction that computes the lowest and highest temperature* reading per window and emits them together with the end timestamp of the window.*/class HighAndLowTempProcessFunctionextends ProcessWindowFunction[SensorReading, MinMaxTemp,String, TimeWindow] {   override def process(       key: String,       ctx: Context,       vals: Iterable[SensorReading],       out: Collector[MinMaxTemp]): Unit = {           val temps = vals.map(_.temperature)           val windowEnd = ctx.window.getEnd           out.collect(MinMaxTemp(key, temps.min, temps.max,windowEnd))  }}

在內部,由ProcessWindowFunction計算的視窗將所有分配的事件儲存在一個ListState中。通過收集所有事件並提供對視窗元資料和其他特性的訪問,ProcessWindowFunction可以處理比ReduceFunction或AggregateFunction更多的用例。但是,收集所有事件的視窗的狀態可能比元素增量聚合視窗的狀態大得多。

INCREMENTAL AGGREGATION 和 PROCESSWINDOWFUNCTION

ProcessWindowFunction是一個非常強大的視窗函式,但是您需要謹慎地使用它,因為它通常比增量聚合函式儲存更多的狀態資料。通常,需要應用於視窗的邏輯可以表示為增量聚合,但它還需要訪問視窗元資料或狀態。

如果您有增量聚合邏輯,但還需要訪問視窗元資料,則可以將執行增量聚合的ReduceFunction或AggregateFunction與提供對更多功能訪問的ProcessWindowFunction組合起來。分配給視窗的元素將被立即聚合,當視窗的觸發器觸發時,聚合的結果將被傳遞給ProcessWindowFunction。process()方法的可迭代引數將只提供單個值,即增量聚合的結果。

在DataStream API中,這是通過將ProcessWindowFunction作為reduce()或aggregate()方法的第二個引數來實現的,如下面的程式碼所示:

input.keyBy(...).timeWindow(...).reduce(   incrAggregator: ReduceFunction[IN],   function: ProcessWindowFunction[IN, OUT, K, W])
input.keyBy(...).timeWindow(...).aggregate( incrAggregator: AggregateFunction[IN, ACC, V], windowFunction: ProcessWindowFunction[V, OUT, K, W])

例子6-13和6-14中的程式碼展示瞭如何解決相同的用例(程式碼例子6-12),聯合ReduceFunction和ProcessWindowFunction,每5秒輸出最小和最大溫度感測器和每個視窗的結束時間戳。

case class MinMaxTemp(id: String, min: Double, max:Double,endTs: Long)val minMaxTempPerWindow2: DataStream[MinMaxTemp] = sensorData.map(r => (r.id, r.temperature, r.temperature)).keyBy(_._1).timeWindow(Time.seconds(5)).reduce(   // incrementally compute min and max temperature  (r1: (String, Double, Double), r2: (String, Double,Double))    => {(r1._1, r1._2.min(r2._2), r1._3.max(r2._3))},   // finalize result in ProcessWindowFunction   new AssignWindowEndProcessFunction())

如示例6-13所示,ReduceFunction和ProcessWindowFunction都是在reduce()方法呼叫中定義的。由於聚合是由ReduceFunction執行的,ProcessWindowFunction只需要將視窗結束時間戳附加到增量計算的結果中,如示例6-14所示。

classAssignWindowEndProcessFunctionextendsProcessWindowFunction[(String,Double,Double),MinMaxTemp,String,TimeWindow]{   override def process(       key: String,       ctx: Context,       minMaxIt: Iterable[(String, Double, Double)],       out: Collector[MinMaxTemp]): Unit = {           val minMax = minMaxIt.head           val windowEnd = ctx.window.getEnd           out.collect(MinMaxTemp(key, minMax._2, minMax._3,windowEnd))  }}

自定義視窗操作

使用Flink的內建視窗分配程式定義的視窗操作符可以處理許多常見的用例。然而,當您開始編寫更高階的流處理應用程式時,您可能會發現自己需要實現更復雜的視窗邏輯,比如在遇到晚到的元素時,輸出早期結果並更新其結果的視窗,或者在接收到特定記錄時啟動和結束的視窗。

DataStream API公開了定義自定義視窗操作符的介面和方法,允許您實現自己的分配程式、觸發器和回收器。與前面討論的視窗函式一起,這些元件在視窗操作符中一起工作,對視窗中的元素進行分組和處理。

當元素到達視窗操作符時,它被傳遞給WindowAssigner。WindowAssigner確定元素需要路由到哪個視窗。如果一個視窗還不存在,它將被建立。

如果視窗操作符配置了增量聚合函式,例如ReduceFunction或AggregateFunction,那麼新新增的元素將立即被聚合,結果將作為視窗的內容儲存。如果視窗操作符沒有增量聚合函式,則將新元素附加到包含所有分配元素的ListState。

每當一個元素被新增到一個視窗時,同時會被傳遞給視窗的觸發器。觸發器定義(觸發)什麼時候一個視窗被認為應該進行計算,以及什麼時候一個視窗被清除且其內容被清除。觸發器可以根據分配的元素或註冊的定時器(類似於處理函式)來決定在特定時間點計算或清除其視窗的內容。

觸發器觸發時發生的情況取決於視窗操作符的配置函式。如果僅使用增量聚合函式配置操作符,則輸出當前聚合結果。這種情況如圖6-4所示。

如果操作符只有一個完整的視窗函式,則該函式將應用於視窗的所有元素,其結果將直接輸出,如圖6-5所示。

最後,如果操作符具有增量聚合函式和完整視窗函式,則將完整視窗函式應用於聚合值並輸出結果。圖6-6描述了這種情況。

回收器(evictor)是一個可選元件,可以在呼叫ProcessWindowFunction之前或之後注入。回收器可以從視窗的內容中刪除收集到的元素。因為它必須遍歷所有元素,所以只能在沒有指定增量聚合函式的情況下使用它。

下面的程式碼展示瞭如何定義一個視窗操作符與自定義觸發器和回收器:

stream.keyBy(...).window(...) // specify the window assigner[.trigger(...)] // optional: specify the trigger[.evictor(...)] // optional: specify the evictor.reduce/aggregate/process(...) // specify the window function

雖然回收器是可選的元件,但是每個視窗操作符都需要一個觸發器來決定何時計算其視窗。為了提供簡潔的視窗操作符API,每個WindowAssigner都有一個預設觸發器,除非定義了顯式觸發器,否則將使用該觸發器。

注意
請注意,顯式指定的觸發器將覆蓋現有的觸發器,而不是對其進行補充——視窗將僅根據上次定義的觸發器進行計算。

在下面的部分中,我們將討論windows的生命週期,並介紹定義自定義視窗分配程式、觸發器和回收器的介面。

視窗的生命週期

視窗操作符在處理傳入流資料時會建立視窗、刪除視窗。如前所述,元素由WindowAssigner分配給視窗,觸發器決定何時計算視窗,視窗函式執行實際的視窗計算。在本節中,我們將討論視窗的生命週期—何時建立它、它包含哪些資訊以及何時刪除它。

當WindowAssigner將第一個元素分配給視窗時,將建立一個視窗。因此,一個視窗至少有一個元素。一個視窗由以下不同的狀態組成:

視窗內容

如果視窗操作符具有ReduceFunction或AggregateFunction,則視窗內容包含分配給視窗的元素或增量聚合的結果。

視窗物件

WindowAssigner返回零個、一個或多個視窗物件。視窗操作符根據返回的物件對元素進行分組。因此,視窗物件儲存用來區分視窗的資訊。每個視窗物件都有一個結束timestamp,它定義了視窗及其狀態可以刪除的時間點。

觸發器定時器

觸發器可以註冊定時器,以便在特定時間點呼叫—例如,計算視窗或清除其內容。這些定時器由視窗操作符維護。

觸發器中自定義的狀態

觸發器可以定義和使用自定義每個視窗和每個key的狀態。此狀態完全由觸發器控制,而不是由視窗操作符維護。

當視窗的結束時間(由視窗物件的結束時間戳定義)達到時,視窗操作符將刪除視窗。是在處理時間發生這種情況,還是在事件時間發生這種情況取決於WindowAssigner.isEventTime()方法返回的值。

當視窗被刪除時,視窗操作符自動清除視窗內容並丟棄視窗物件。不清除自定義觸發器狀態和註冊的觸發器定時器,因為此狀態對視窗操作符不透明。因此,觸發器必須在trigger .clear()方法中清除所有狀態,以防止狀態洩漏。

視窗分配程式

WindowAssigner決定將每個到達的元素分配給哪個視窗。元素可以新增到零、一個或多個視窗。下面展示的是windowAssigner介面:

public abstract class WindowAssigner<T, W extends Window> implements Serializable {   // Returns a collection of windows to which the element is assigned   public abstract Collection<W> assignWindows(T element,                                               long timestamp,                                               WindowAssignerContext context);   // Returns the default Trigger of the WindowAssigner   public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);   // Returns the TypeSerializer for the windows of this WindowAssigner public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);   // Indicates whether this assigner creates event-time windows   public abstract boolean isEventTime();   // A context that gives access to the current processing time   public abstract static class WindowAssignerContext {       // Returns the current processing time       public abstract long getCurrentProcessingTime();  }}

“WindowAssigner”的型別取決於輸入元素的型別和元素被分配到的視窗的型別。它還需要提供一個預設觸發器,如果沒有指定顯式觸發器,則使用該觸發器。例6-15中的程式碼為30秒滾動事件時間視窗建立了一個自定義的分配程式。

/** A custom window that groups events into 30-second tumblingwindows. */class ThirtySecondsWindows extends WindowAssigner[Object, TimeWindow] {   val windowSize: Long = 30 * 1000L   override def assignWindows(                   o: Object,                   ts: Long,                   ctx: WindowAssigner.WindowAssignerContext):java.util.List[TimeWindow] = {       // rounding down by 30 seconds       val startTime = ts - (ts % windowSize)       val endTime = startTime + windowSize       // emitting the corresponding time window       Collections.singletonList(new TimeWindow(startTime,endTime))  }   override def getDefaultTrigger(       env: environment.StreamExecutionEnvironment):Trigger[Object, TimeWindow] = {       EventTimeTrigger.create()  }   override def getWindowSerializer(       executionConfig: ExecutionConfig):TypeSerializer[TimeWindow] = {       new TimeWindow.Serializer  }   override def isEventTime = true}
全域性視窗分配程式
GlobalWindows分配程式將所有元素對映到同一個全域性視窗。它的預設觸發器是NeverTrigger,顧名思義,永不觸發。因此,GlobalWindows assigner需要一個自定義觸發器和一個可能的回收器來有選擇地從視窗狀態刪除元素。
GlobalWindows的結束時間戳是Long.MAX_VALUE。因此,GlobalWindows永遠不會被徹底清除。當應用於key空間不斷變化的KeyedStream時,GlobalWindows將為每個key維護一些狀態。所以請謹慎使用。

除了WindowAssigner介面之外,還有擴充套件了WindowAssigner的MergingWindowAssigner介面。MergingWindowAssigner用於需要合併現有視窗的視窗操作符。我們前面討論過的EventTimeSessionWindows assigner就是這種分配器的一個例子,它的工作方式是為每個到達的元素建立一個新視窗,然後合併重疊的視窗。

在合併視窗時,需要確保所有合併的視窗及其觸發器的狀態也已適當合併。觸發器介面提供了一個回撥方法,該方法在合併視窗時,以及合併與視窗關聯的狀態時呼叫。下一節將更詳細地討論視窗合併。

TRIGGERS(觸發器)

觸發器定義何時計算視窗並輸出視窗的結果。觸發器可以根據特定於時間或特定資料條件(如元素計數或某些接收到的元素值)中的處理情況決定是否觸發。例如,當處理時間或水印超過視窗結束邊界的時間戳時,將觸發前面討論的時間視窗的預設觸發器。

觸發器可以訪問時間屬性和定時器,並且可以使用狀態。因此,它們與處理函式一樣強大。例如,您可以實現觸發邏輯,當視窗接收到一定數量的元素時觸發,當一個具有特定值的元素被新增到視窗時觸發,或者在檢測新增元素的模式之後觸發,比如“5秒內發生兩起相同型別的事件”。自定義觸發器還可以用於計算和輸出事件時間視窗的早期結果,在水印到達視窗的結束時間戳之前。這是產生(不完整)低延遲結果的常見策略,儘管使用了保守的水印策略。

每次呼叫觸發器時,它都會生成一個TriggerResult來確定應該對視窗執行什麼操作。TriggerResult可以取以下值之一:

CONTINUE

不觸發任何操作。

FIRE(觸發)

如果視窗操作符具有ProcessWindowFunction,則呼叫該函式並輸出結果。如果視窗只有一個增量聚合函式(ReduceFunction或AggregateFunction),則輸出當前聚合結果。視窗的狀態沒有改變。

PURGE(清除)

視窗的內容被完全丟棄,包含所有元資料的視窗被刪除。另外,呼叫ProcessWindowFunction.clear()方法來清除每個視窗的所有自定義狀態。

FIRE_AND_PURGE

FIRE_AND_PURGE:首先計算視窗(FIRE),然後刪除所有狀態和元資料(PURGE)。

可能的TriggerResult值使您能夠實現複雜的視窗邏輯。自定義觸發器可以觸發多次,計算新的或更新的結果,或在滿足特定條件時清除視窗而不輸出結果。下一個程式碼塊顯示了觸發器API:

public abstract class Trigger<T, W extends Window> implements Serializable {   // Called for every element that gets added to a window   TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);   // Called when a processing-time timer fires   public abstract TriggerResult onProcessingTime(long timestamp,W window,TriggerContext ctx);   // Called when an event-time timer fires   public abstract TriggerResult onEventTime(long timestamp, W window, TriggerContext ctx);   // Returns true if this trigger supports merging of trigger state   public boolean canMerge();   // Called when several windows have been merged into one window   // and the state of the triggers needs to be merged   public void onMerge(W window, OnMergeContext ctx);   // Clears any state that the trigger might hold for the given window   // This method is called when a window is purged   public abstract void clear(W window, TriggerContext ctx);}// A context object that is given to Trigger methods to allow them// to register timer callbacks and deal with statepublic interface TriggerContext {   // Returns the current processing time   long getCurrentProcessingTime();   // Returns the current watermark time   long getCurrentWatermark();   // Registers a processing-time timer   void registerProcessingTimeTimer(long time);   // Registers an event-time timer   void registerEventTimeTimer(long time);   // Deletes a processing-time timer   void deleteProcessingTimeTimer(long time);   // Deletes an event-time timer   void deleteEventTimeTimer(long time);   // Retrieves a state object that is scoped to the window and the key of the trigger   <S extends State> S getPartitionedState(StateDescriptor<S,?> stateDescriptor);}// Extension of TriggerContext that is given to the Trigger.onMerge() methodpublic interface OnMergeContext extends TriggerContext {   // Merges per-window state of the trigger   // The state to be merged must support merging   void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);}

如您所見,觸發器API可以通過提供對時間和狀態的訪問來實現複雜的邏輯。觸發器有兩個方面需要特別注意:清理狀態和合並觸發器。

在觸發器中使用每個視窗的狀態時,需要確保在刪除視窗時正確刪除了該狀態。否則,視窗操作符會隨著時間積累越來越多的狀態,您的應用程式可能會在某個時候失敗。為了在刪除視窗時清除所有狀態,觸發器的clear()方法需要刪除每個視窗的所有自定義狀態,並使用TriggerContext物件刪除所有處理時間和事件時間計時器。無法在定時器回撥方法中清理狀態,因為在刪除視窗後不會呼叫這些方法。

如果觸發器與MergingWindowAssigner一起應用,則需要能夠在合併兩個視窗時處理這種情況。在這種情況下,還需要合併觸發器的任何自定義狀態。canMerge()宣告觸發器是否支援合併,而onMerge()方法需要實現執行合併的邏輯。如果觸發器不支援合併,則不能將其與MergingWindowAssigner組合使用。

在合併觸發器時,必須將所有自定義狀態的描述符提供給OnMergeContext物件的mergePartitionedState()方法。

注意
請注意,可合併觸發器只能使用可以自動合併的狀態原語——ListState, ReduceState, 或AggregatingState.

示例6-16顯示了一個在到達視窗結束時間之前提前觸發的觸發器。當第一個事件被分配給一個視窗時,觸發器註冊一個定時器,比當前水印早1秒。當定時器觸發時,將註冊一個新計時器。因此,觸發器最多每秒觸發一次。

/** A trigger that fires early. The trigger fires at most every second. */class OneSecondIntervalTriggerextends Trigger[SensorReading, TimeWindow] {override def onElement(r: SensorReading,                      timestamp: Long,                      window: TimeWindow,                      ctx: Trigger.TriggerContext): TriggerResult = {   // firstSeen will be false if not set yet   val firstSeen: ValueState[Boolean] =ctx.getPartitionedState(new ValueStateDescriptor[Boolean]("firstSeen",classOf[Boolean]))   // register initial timer only for first element   if (!firstSeen.value()) {       // compute time for next early firing by rounding watermark to second       val t = ctx.getCurrentWatermark + (1000 -(ctx.getCurrentWatermark % 1000))       ctx.registerEventTimeTimer(t)       // register timer for the window end       ctx.registerEventTimeTimer(window.getEnd)       firstSeen.update(true)  }   // Continue. Do not evaluate per element   TriggerResult.CONTINUE}override def onEventTime(   timestamp: Long,   window: TimeWindow,   ctx: Trigger.TriggerContext): TriggerResult = {   if (timestamp == window.getEnd) {       // final evaluation and purge window state       TriggerResult.FIRE_AND_PURGE  } else {       // register next early firing timer       val t = ctx.getCurrentWatermark + (1000 -(ctx.getCurrentWatermark % 1000))       if (t < window.getEnd) {           ctx.registerEventTimeTimer(t)      }       // fire trigger to evaluate window       TriggerResult.FIRE  }}override def onProcessingTime(   timestamp: Long,   window: TimeWindow,   ctx: Trigger.TriggerContext): TriggerResult = {   // Continue. We don't use processing time timers   TriggerResult.CONTINUE}override def clear(window: TimeWindow,ctx: Trigger.TriggerContext): Unit = {   // clear trigger state   val firstSeen: ValueState[Boolean] =ctx.getPartitionedState(new ValueStateDescriptor[Boolean]("firstSeen",classOf[Boolean]))   firstSeen.clear()  }}

請注意,觸發器使用自定義狀態,該狀態使用clear()方法清除。因為我們使用的是一個簡單的不可合併的ValueState,所以觸發器是不可合併的。

EVICTORS

在Flink的視窗機制中,Evictor(驅逐器)是一個可選元件。它可以在視窗函式執行之前或之後從視窗中刪除元素。

示例6-17顯示了Evictor介面。

public interface Evictor<T, W extends Window> extends Serializable {   // Optionally evicts elements. Called before windowing function.   void evictBefore(       Iterable<TimestampedValue<T>> elements,       int size,       W window,       EvictorContext evictorContext);   // Optionally evicts elements. Called after windowing function.   void evictAfter(       Iterable<TimestampedValue<T>> elements,       int size,       W window,       EvictorContext evictorContext);}// A context object that is given to Evictor methods.interface EvictorContext {   // Returns the current processing time.   long getCurrentProcessingTime();   // Returns the current event time watermark.   long getCurrentWatermark();}

在將視窗函式應用於視窗內容之前和之後分別呼叫evictBefore()和evictAfter()方法。這兩個方法都有一個Iterable引數(服務於新增到視窗的所有元素)、視窗中的元素數量(大小)引數、視窗物件和一個EvictorContext(提供對當前處理時間和水印的訪問)引數。通過呼叫可從Iterable獲得的Iterator物件上的remove()方法,可以從視窗中刪除元素。

PREAGGREGATION和EVICTORS
Evictors遍歷視窗中的元素列表。只有當視窗收集所有新增的事件而不應用ReduceFunction或AggregateFunction以增量聚合視窗內容時,才能應用它們。

Evictors通常應用在GlobalWindow上,用於對視窗進行部分清理——而不清除整個視窗狀態。

Joining Streams on Time

使用流時,一個常見需求是connect或join兩個流中的事件。Flink的DataStream API提供了兩個內建的操作符來join具有時間條件的流:間隔連線視窗連線(interval join 和 window join)。在本節中,我們將介紹這兩種操作符。

如果無法使用Flink的內建連線操作符來表達所需的連線語義,則可以使用CoProcessFunction、BroadcastProcessFunction或KeyedBroadcastProcessFunction實現自定義連線邏輯。

注意
注意,您應該使用有效的狀態訪問模式和有效的狀態清理策略來設計這樣的操作符。

Interval Join

interval join連線來自兩個具有公共key的流的事件,這兩個流之間的時間戳間隔不超過指定的時間間隔。

圖6-7顯示了兩個流的interval join:A和B,如果B事件的時間戳早於B事件的時間戳不少於一個小時,並且晚於B事件的時間戳不超過15分鐘,則將A事件和B事件進行Join。join interval是對稱的,即來自B的一個事件與來自A的所有事件連線,這些事件都比B事件早不超過15分鐘,最多晚一個小時。

interval join目前只支援事件時間並使用內部連線語義進行操作(沒有匹配事件的事件將不會被轉發)。interval join的定義如示例6-18所示。

input1
.keyBy(…)
.between(<lower-bound>,<upper-bound>)// bounds with respect to input1
.process(ProcessJoinFunction)// process pairs of matched events

join事件雙方都被傳遞到ProcessJoinFunction中。下界和上界定義為負的和正的時間間隔,例如,between(Time.hour(-1), Time.minute(15)).。只要下界小於上界,就可以任意選擇上界和下界;可以將所有A事件與所有B事件連線起來,只要B事件的時間戳比A事件多一到兩個小時。

interval join需要緩衝來自一個或兩個輸入的記錄。對於第一個輸入,所有具有大於當前水印的時間戳(即上限)的記錄都將被緩衝。對於第二個輸入,所有時間戳大於當前水印+下界的記錄將被緩衝。注意,兩個界限都可能是負的。圖6-7儲存了流A所有加入時間戳大於當前watermark-15分鐘的記錄,和流B中所有時間戳大於當前watermark---1小時的記錄。你應該意識到,interval join的儲存需求可能會大大增加,如果兩個輸入流的事件時間不同步,因為水印是由“slower”流確定。

Window Join

顧名思義,window join基於Flink的視窗機制。兩個輸入流的元素都被分配到公共視窗,並在視窗完成時聯接(或分組)。

input1.join(input2).where(...) // specify key attributes for input1.equalTo(...) // specify key attributes for input2.window(...) // specify the WindowAssigner[.trigger(...)] // optional: specify a Trigger[.evictor(...)] // optional: specify an Evictor.apply(...) // specify the JoinFunction

兩個輸入流都根據它們的key屬性成為了key流型別,公共視窗分配程式將這兩個流的事件對映到公共視窗,這意味著視窗將儲存這兩個輸入的事件。當一個視窗的定時器觸發時,對來自第一個和第二個輸入的元素的每個組合呼叫JoinFunction(向量叉積Cross Product)。還可以指定自定義觸發器和收回器。由於這兩個流的事件被對映到相同的視窗,因此觸發器和回收器的行為與常規視窗操作符完全相同。

除了連線兩個流之外,還可以使用cogroup()而不是join()來進行操作符定義,從而將兩個流聯合到一個視窗上。總體邏輯是相同的,但不是為兩個輸入的每一對事件呼叫一個JoinFunction,而是使用來自兩個輸入的元素的迭代器在每個視窗呼叫一次CoGroupFunction。

注意
需要注意的是,連線視窗化的流可能會產生意想不到的語義。例如,假設您使用配置了1小時滾動視窗的連線操作符連線兩個流。第一個輸入的元素不會與第二個輸入的元素連線,即使它們之間相差只有1秒,但是它們被分配到兩個不同的視窗。

延遲資料處理

如前所述,可以使用水印來平衡結果的完整性和結果延遲。除非您選擇一種非常保守的水印策略,以確保所有相關記錄都以高延遲為代價,否則您的應用程式很可能必須處理延遲的資料元素。

延遲元素是這樣一個元素:當它到達一個操作符時,為其提供的計算已經被執行。在事件時間視窗操作符的上下文中,如果事件到達操作符,並且視窗分配程式將其對映到已經計算過的視窗,則該事件將延遲,因為操作符的水印傳遞了視窗的結束時間戳。

DataStream API提供了處理延遲事件的不同選項:

  • 延遲事件可以簡單地刪除。

  • 延遲事件可以重定向到單獨的流。

  • 計算結果可以根據延遲事件進行更新,並且必須輸出更新。

在下面,我們將詳細討論這些選項,並展示如何將它們應用於處理函式和視窗操作符。

刪除延遲事件

處理延遲事件的最簡單方法是簡單地丟棄它們。刪除延遲事件是事件時間視窗操作符的預設行為。因此,遲到的元素不會建立新視窗。

通過比較事件的時間戳和當前水印,處理函式可以很容易地過濾掉延遲事件。

重定向延遲事件

延遲事件還可以使用側輸出特性重定向到另一個DataStream。從那裡,可以使用常規的接收函式處理或發出延遲事件。根據業務需求,後期資料稍後可以通過定期的回填過程整合到流應用程式的結果中。示例6-20展示瞭如何為延遲事件指定帶有側輸出的視窗操作符。

val readings: DataStream[SensorReading] = ???val countPer10Secs: DataStream[(String, Long, Int)] = readings.keyBy(_.id).timeWindow(Time.seconds(10))// emit late readings to a side output.sideOutputLateData(new OutputTag[SensorReading]("latereadings"))// count readings per window.process(new CountFunction())// retrieve the late events from the side output as a streamval lateStream: DataStream[SensorReading] = countPer10Secs.getSideOutput(new OutputTag[SensorReading]("late-readings"))

處理函式可以通過比較事件時間戳和當前水印並使用常規的側輸出API輸出它們來識別延遲事件。示例6-21顯示了一個ProcessFunction,它過濾來自其輸入的延遲感測器讀數,並將它們重定向到側輸出流。

val readings: DataStream[SensorReading] = ???val filteredReadings: DataStream[SensorReading] = readings.process(new LateReadingsFilter)// retrieve late readingsval lateReadings: DataStream[SensorReading] = filteredReadings.getSideOutput(new OutputTag[SensorReading]("late-readings"))/** A ProcessFunction that filters out late sensor readings and* re-directs them to a side output */class LateReadingsFilter extends ProcessFunction[SensorReading, SensorReading] {   val lateReadingsOut = new OutputTag[SensorReading]("latereadings")   override def processElement(       r: SensorReading,       ctx: ProcessFunction[SensorReading,       SensorReading]#Context,       out: Collector[SensorReading]): Unit = {       // compare record timestamp with current watermark       if (r.timestamp < ctx.timerService().currentWatermark()) {           // this is a late reading => redirect it to the side output           ctx.output(lateReadingsOut, r)      } else {           out.collect(r)      }  }}
通過延遲事件來更新結果

延遲事件在它們應該完成的計算之後到達操作符。因此,操作符輸出的結果是不完整或不準確的。另一種策略是重新計算不完整的結果並輸出更新,而不是刪除或重定向延遲事件。但是,為了能夠重新計算和更新結果,需要考慮一些問題。

支援重新計算和更新已輸出結果的操作符需要在發出第一個結果後保留計算所需的所有狀態。但是,由於操作符通常不可能永遠保留所有狀態,所以需要在某個時候清除狀態。一旦清除了某個結果的狀態,就不能再更新該結果,只能刪除或重定向延遲事件。

除了保持狀態外,下游操作符或跟隨操作符的外部系統(更新之前發出的結果)還需要能夠處理這些更新。例如,將結果和key值視窗操作符的更新寫入key值儲存的接收器操作符可以通過使用upsert寫操作用最新更新覆蓋不準確的結果來實現這一點。對於某些用例,可能還需要區分第一個結果和由於延遲事件而導致的更新。

視窗操作符API提供了一個方法來顯式宣告您期望的延遲元素。在使用事件時間視窗時,可以指定一個稱為允許延遲的附加時間段。允許延遲的視窗操作符不會在水印通過視窗的結束時間戳後刪除視窗及其狀態。相反,操作符將繼續維護允許的延遲期的完整視窗。當一個late元素在允許的延遲週期內到達時,它就像一個正常到達的元素一樣被處理並傳遞給觸發器。當水印通過視窗的結束時間戳和延遲間隔時,視窗最終被刪除,隨後的所有延遲元素被丟棄。

允許的延遲可以使用allowedLateness()方法指定,如示例6-22所示。

val readings: DataStream[SensorReading] = ???val countPer10Secs: DataStream[(String, Long, Int, String)] =readings.keyBy(_.id).timeWindow(Time.seconds(10))// process late readings for 5 additional seconds.allowedLateness(Time.seconds(5))// count readings and update results if late readings arrive.process(new UpdatingWindowCountFunction)/** A counting WindowProcessFunction that distinguishes between* first results and updates. */class UpdatingWindowCountFunction extends ProcessWindowFunction[SensorReading, (String, Long, Int, String), String,TimeWindow] {override def process(id: String,ctx: Context,elements: Iterable[SensorReading],out: Collector[(String, Long, Int, String)]): Unit = {       // count the number of readings       val cnt = elements.count(_ => true)       // state to check if this is the first evaluation of the window or not       val isUpdate = ctx.windowState.getState(       new ValueStateDescriptor[Boolean]("isUpdate",Types.of[Boolean]))       if (!isUpdate.value()) {           // first evaluation, emit first result           out.collect((id, ctx.window.getEnd, cnt, "first"))           isUpdate.update(true)      } else {           // not the first evaluation, emit an update           out.collect((id, ctx.window.getEnd, cnt, "update"))      }  }}

還可以實現處理函式來支援延遲資料。由於狀態管理始終是自定義的,並且是在處理函式中手動完成的,所以Flink不提供支援延遲資料的內建API。相反,您可以使用記錄時間戳、水印和計時器的構建塊來實現必要的邏輯。

結束語

在本章中,學習瞭如何實現按時執行的流應用程式。我們解釋瞭如何配置流應用程式的時間特徵,以及如何分配時間戳和水印。您瞭解了基於時間的操作符,包括Flink的處理函式、內建視窗和自定義視窗。我們還討論了水印的語義、如何權衡結果的完整性和結果的延遲以及處理延遲事件的策略。