Flink基礎(八):ProcessFunction API(底層 API)
阿新 • • 發佈:2020-08-03
1 簡介
我們之前學習的轉換運算元是無法訪問事件的時間戳資訊和水位線資訊的。而這在一些應用場景下,極為重要。例如 MapFunction 這樣的 map 轉換運算元就無法訪問 時間戳或者當前事件的事件時間。 基於此,DataStream API 提供了一系列的 Low-Level 轉換運算元。可以訪問時間戳、watermark 以及註冊定時事件。還可以輸出特定的一些事件,例如超時事件等。 Process Function 用來構建事件驅動的應用以及實現自定義的業務邏輯(使用之前的window 函式和轉換運算元無法實現)。例如,Flink SQL 就是使用 Process Function 實 現的。 Flink 提供了 8 個 Process Function: ProcessFunction KeyedProcessFunction CoProcessFunction ProcessJoinFunction BroadcastProcessFunction KeyedBroadcastProcessFunction ProcessWindowFunction ProcessAllWindowFunction2 KeyedProcessFunction
3TimerService 和 定時器(Timers)
val warnings = readings .keyBy(_.id) .process(new TempIncreaseAlertFunction)看一下 TempIncreaseAlertFunction 如何實現, 程式中使用了 ValueState 這樣一個狀態變數。
class TempIncreaseAlertFunction extends KeyedProcessFunction[String, SensorReading, String] { // 儲存上一個感測器溫度值 lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("lastTemp", Types.of[Double]) ) // 儲存註冊的定時器的時間戳 lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("timer", Types.of[Long]) ) override def processElement(r: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = { // 取出上一次的溫度 val prevTemp = lastTemp.value() // 將當前溫度更新到上一次的溫度這個變數中 lastTemp.update(r.temperature) val curTimerTimestamp = currentTimer.value() if (prevTemp == 0.0 || r.temperature < prevTemp) { // 溫度下降或者是第一個溫度值,刪除定時器 ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp) // 清空狀態變數 currentTimer.clear() } else if (r.temperature > prevTemp && curTimerTimestamp == 0) { // 溫度上升且我們並沒有設定定時器 val timerTs = ctx.timerService().currentProcessingTime() + 1000 ctx.timerService().registerProcessingTimeTimer(timerTs) currentTimer.update(timerTs) } } override def onTimer(ts: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = { out.collect("感測器 id 為: " + ctx.getCurrentKey + "的感測器溫度值已經連續 1s 上升了。") currentTimer.clear() } }View Code
4側輸出流(SideOutput)
大部分的 DataStream API 的運算元的輸出是單一輸出,也就是某種資料型別的流。除了 split 運算元,可以將一條流分成多條流,這些流的資料型別也都相同。process function 的 side outputs 功能可以產生多條流,並且這些流的資料型別可以不一樣。一個 side output 可以定義為 OutputTag[X]物件,X 是輸出流的資料型別。process function 可以通過 Context 物件發射一個事件到一個或者多個 side outputs。 下面是一個示例程式:val monitoredReadings: DataStream[SensorReading] = readings .process(new FreezingMonitor) monitoredReadings .getSideOutput(new OutputTag[String]("freezing-alarms")) .print() readings.print()View Code 接下來我們實現 FreezingMonitor 函式,用來監控感測器溫度值,將溫度值低於32F 的溫度輸出到 side output。
class FreezingMonitor extends ProcessFunction[SensorReading, SensorReading] { // 定義一個側輸出標籤 lazy val freezingAlarmOutput: OutputTag[String] = new OutputTag[String]("freezing-alarms") override def processElement(r: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = { // 溫度在 32F 以下時,輸出警告資訊 if (r.temperature < 32.0) { ctx.output(freezingAlarmOutput, s"Freezing Alarm for ${r.id}") } // 所有資料直接常規輸出到主流 out.collect(r) } }View Code