1. 程式人生 > 實用技巧 >Flink例項(三十八):狀態管理(九)自定義操作符狀態(四)廣播狀態(Broadcast state)(二)

Flink例項(三十八):狀態管理(九)自定義操作符狀態(四)廣播狀態(Broadcast state)(二)

使用連線的廣播狀態

一個常見的需求就是流應用需要將同樣的事件分發到操作符的所有的並行例項中,而這樣的分發操作還得是可恢復的。

我們舉個例子:一條流是一個規則(比如5秒鐘內連續兩個超過閾值的溫度),另一條流是待匹配的流。也就是說,規則流和事件流。所以每一個操作符的並行例項都需要把規則流儲存在操作符狀態中。也就是說,規則流需要被廣播到所有的並行例項中去。

在Flink中,這樣的狀態叫做廣播狀態(broadcast state)。廣播狀態和DataStream或者KeyedStream都可以做連線操作。

下面的例子實現了一個溫度報警應用,應用有可以動態設定的閾值,動態設定通過廣播流來實現。

val sensorData: DataStream[SensorReading] = ...
val thresholds: DataStream[ThresholdUpdate] = ...
val keyedSensorData: KeyedStream[SensorReading, String] = sensorData
  .keyBy(_.id)

// the descriptor of the broadcast state
val broadcastStateDescriptor =
  new MapStateDescriptor[String, Double](
    "thresholds", classOf[String], classOf[Double])

val broadcastThresholds: BroadcastStream[ThresholdUpdate] = thresholds
  .broadcast(broadcastStateDescriptor)

// connect keyed sensor stream and broadcasted rules stream
val alerts: DataStream[(String, Double, Double)] = keyedSensorData
  .connect(broadcastThresholds)
  .process(new UpdatableTemperatureAlertFunction())

帶有廣播狀態的函式在應用到兩條流上時分三個步驟:

  • 呼叫DataStream.broadcast()來建立BroadcastStream,定義一個或者多個MapStateDescriptor物件。
  • 將BroadcastStream和DataStream/KeyedStream做connect操作。
  • 在connected streams上呼叫KeyedBroadcastProcessFunction/BroadcastProcessFunction。

下面的例子實現了動態設定溫度閾值的功能。

class UpdatableTemperatureAlertFunction()
    extends KeyedBroadcastProcessFunction[String,
      SensorReading, ThresholdUpdate, (String, Double, Double)] {

  // the descriptor of the broadcast state
  private lazy val thresholdStateDescriptor =
    new MapStateDescriptor[String, Double](
      "thresholds", classOf[String], classOf[Double])

  // the keyed state handle
  private var lastTempState: ValueState[Double] = _

  override def open(parameters: Configuration): Unit = {
    // create keyed state descriptor
    val lastTempDescriptor = new ValueStateDescriptor[Double](
      "lastTemp", classOf[Double])
    // obtain the keyed state handle
    lastTempState = getRuntimeContext
      .getState[Double](lastTempDescriptor)
  }

  override def processBroadcastElement(
      update: ThresholdUpdate,
      ctx: KeyedBroadcastProcessFunction[String,
        SensorReading, ThresholdUpdate,
        (String, Double, Double)]#Context,
      out: Collector[(String, Double, Double)]): Unit = {
    // get broadcasted state handle
    val thresholds = ctx
      .getBroadcastState(thresholdStateDescriptor)

    if (update.threshold != 0.0d) {
      // configure a new threshold for the sensor
      thresholds.put(update.id, update.threshold)
    } else {
      // remove threshold for the sensor
      thresholds.remove(update.id)
    }
  }

  override def processElement(
      reading: SensorReading,
      readOnlyCtx: KeyedBroadcastProcessFunction
        [String, SensorReading, ThresholdUpdate,
        (String, Double, Double)]#ReadOnlyContext,
      out: Collector[(String, Double, Double)]): Unit = {
    // get read-only broadcast state
    val thresholds = readOnlyCtx
      .getBroadcastState(thresholdStateDescriptor)
    // check if we have a threshold
    if (thresholds.contains(reading.id)) {
      // get threshold for sensor
      val sensorThreshold: Double = thresholds.get(reading.id)

      // fetch the last temperature from state
      val lastTemp = lastTempState.value()
      // check if we need to emit an alert
      val tempDiff = (reading.temperature - lastTemp).abs
      if (tempDiff > sensorThreshold) {
        // temperature increased by more than the threshold
        out.collect((reading.id, reading.temperature, tempDiff))
      }
    }

    // update lastTemp state
    this.lastTempState.update(reading.temperature)
  }
}