1. 程式人生 > 實用技巧 >Flink基礎(八):ProcessFunction API(底層 API)

Flink基礎(八):ProcessFunction API(底層 API)

1 簡介  

  我們之前學習的轉換運算元是無法訪問事件的時間戳資訊和水位線資訊的。而這在一些應用場景下,極為重要。例如 MapFunction 這樣的 map 轉換運算元就無法訪問 時間戳或者當前事件的事件時間。   基於此,DataStream API 提供了一系列的 Low-Level 轉換運算元。可以訪問時間戳、watermark 以及註冊定時事件。還可以輸出特定的一些事件,例如超時事件等。 Process Function 用來構建事件驅動的應用以及實現自定義的業務邏輯(使用之前的window 函式和轉換運算元無法實現)。例如,Flink SQL 就是使用 Process Function 實 現的。 Flink 提供了 8 個 Process Function:  ProcessFunction  KeyedProcessFunction  CoProcessFunction  ProcessJoinFunction  BroadcastProcessFunction  KeyedBroadcastProcessFunction  ProcessWindowFunction  ProcessAllWindowFunction

2 KeyedProcessFunction

  這裡我們重點介紹 KeyedProcessFunction。   KeyedProcessFunction 用來操作 KeyedStream。KeyedProcessFunction 會處理流的每一個元素,輸出為 0 個、1 個或者多個元素。所有的 Process Function 都繼承自 RichFunction 介面,所以都有 open()、close()和 getRuntimeContext()等方法。而KeyedProcessFunction[KEY, IN, OUT]還額外提供了兩個方法:   processElement(v: IN, ctx: Context, out: Collector[OUT])
, 流中的每一個元素都會呼叫這個方法,呼叫結果將會放在 Collector 資料型別中輸出。Context 可以訪問元素的時間戳,元素的 key,以及 TimerService 時間服務。Context還可以將結果輸出到別的流(side outputs)。   onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一個回撥函式。當之前註冊的定時器觸發時呼叫。引數 timestamp 為定時器所設定 的觸發的時間戳。Collector 為輸出結果的集合。OnTimerContext 和processElement 的 Context 引數一樣,提供了上下文的一些資訊,例如定時器觸發的時間資訊(事件時間或者處理時間)。

3TimerService 和 定時器(Timers)

Context 和 OnTimerContext 所持有的 TimerService 物件擁有以下方法:  currentProcessingTime(): Long 返回當前處理時間  currentWatermark(): Long 返回當前 watermark 的時間戳  registerProcessingTimeTimer(timestamp: Long): Unit 會註冊當前 key 的 processing time 的定時器。當 processing time 到達定時時間時,觸發 timer。  registerEventTimeTimer(timestamp: Long): Unit 會註冊當前 key 的 event time 定時器。當水位線大於等於定時器註冊的時間時,觸發定時器執行回撥函式。  deleteProcessingTimeTimer(timestamp: Long): Unit 刪除之前註冊處理時間定 時器。如果沒有這個時間戳的定時器,則不執行。 deleteEventTimeTimer(timestamp: Long): Unit 刪除之前註冊的事件時間定時 器,如果沒有此時間戳的定時器,則不執行。 當定時器 timer 觸發時,會執行回撥函式 onTimer()。注意定時器 timer 只能在 keyed streams 上面使用。 下面舉個例子說明 KeyedProcessFunction 如何操作 KeyedStream。 需求:監控溫度感測器的溫度值,如果溫度值在一秒鐘之內(processing time)連 續上升,則報警。
val warnings = readings
.keyBy(_.id) .process(new TempIncreaseAlertFunction)
看一下 TempIncreaseAlertFunction 如何實現, 程式中使用了 ValueState 這樣一個狀態變數。
class TempIncreaseAlertFunction extends KeyedProcessFunction[String, SensorReading, String] {
 // 儲存上一個感測器溫度值
 lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(
 new ValueStateDescriptor[Double]("lastTemp", Types.of[Double])
 )
 // 儲存註冊的定時器的時間戳
 lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState(
 new ValueStateDescriptor[Long]("timer", Types.of[Long])
 )
 override def processElement(r: SensorReading,
 ctx: KeyedProcessFunction[String, SensorReading, String]#Context,
 out: Collector[String]): Unit = {
 // 取出上一次的溫度
 val prevTemp = lastTemp.value()
// 將當前溫度更新到上一次的溫度這個變數中
 lastTemp.update(r.temperature)
 val curTimerTimestamp = currentTimer.value()
 if (prevTemp == 0.0 || r.temperature < prevTemp) {
 // 溫度下降或者是第一個溫度值,刪除定時器
 ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp)
 // 清空狀態變數
 currentTimer.clear()
 } else if (r.temperature > prevTemp && curTimerTimestamp == 0) {
 // 溫度上升且我們並沒有設定定時器
 val timerTs = ctx.timerService().currentProcessingTime() + 1000
 ctx.timerService().registerProcessingTimeTimer(timerTs)
 currentTimer.update(timerTs)
 }
 }
 override def onTimer(ts: Long,
 ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext,
 out: Collector[String]): Unit = {
 out.collect("感測器 id 為: " + ctx.getCurrentKey + "的感測器溫度值已經連續 1s 上升了。")
 currentTimer.clear()
 } }
View Code

4側輸出流(SideOutput)

  大部分的 DataStream API 的運算元的輸出是單一輸出,也就是某種資料型別的流。除了 split 運算元,可以將一條流分成多條流,這些流的資料型別也都相同。process function 的 side outputs 功能可以產生多條流,並且這些流的資料型別可以不一樣。一個 side output 可以定義為 OutputTag[X]物件,X 是輸出流的資料型別。process function 可以通過 Context 物件發射一個事件到一個或者多個 side outputs。 下面是一個示例程式:
val monitoredReadings: DataStream[SensorReading] = readings
 .process(new FreezingMonitor)
monitoredReadings
 .getSideOutput(new OutputTag[String]("freezing-alarms"))
 .print()
readings.print()
View Code   接下來我們實現 FreezingMonitor 函式,用來監控感測器溫度值,將溫度值低於32F 的溫度輸出到 side output。
class FreezingMonitor extends ProcessFunction[SensorReading, SensorReading] {
 // 定義一個側輸出標籤
 lazy val freezingAlarmOutput: OutputTag[String] =
 new OutputTag[String]("freezing-alarms")
 override def processElement(r: SensorReading,
 ctx: ProcessFunction[SensorReading, SensorReading]#Context,
 out: Collector[SensorReading]): Unit = {
 // 溫度在 32F 以下時,輸出警告資訊
 if (r.temperature < 32.0) {
 ctx.output(freezingAlarmOutput, s"Freezing Alarm for ${r.id}")
 }
 // 所有資料直接常規輸出到主流
 out.collect(r)
 }
}
View Code

5CoProcessFunction

  對於兩條輸入流,DataStream API 提供了 CoProcessFunction 這樣的 low-level操作。CoProcessFunction 提供了操作每一個輸入流的方法: processElement1()和 processElement2()。   類似於 ProcessFunction,這兩種方法都通過 Context 物件來呼叫。這個 Context物件可以訪問事件資料,定時器時間戳,TimerService,以及 side outputs。 CoProcessFunction 也提供了 onTimer()回撥函式。