1. 程式人生 > 實用技巧 >Flink例項(三十一):狀態管理(二)自定義鍵控狀態(一)ValueState

Flink例項(三十一):狀態管理(二)自定義鍵控狀態(一)ValueState

ValueState[T]儲存單個的值,值的型別為T。

  • get操作: ValueState.value()
  • set操作: ValueState.update(value: T)

例項一

scala version

val sensorData: DataStream[SensorReading] = ...
val keyedData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id)

val alerts: DataStream[(String, Double, Double)] = keyedData
  .flatMap(new TemperatureAlertFunction(1.7))

class TemperatureAlertFunction(val threshold: Double)
  extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
  private var lastTempState: ValueState[Double] = _

  override def open(parameters: Configuration): Unit = {
    val lastTempDescriptor = new ValueStateDescriptor[Double](
      "lastTemp", classOf[Double])
lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor)
} override def flatMap( reading: SensorReading, out: Collector[(String, Double, Double)] ): Unit = { val lastTemp = lastTempState.value() val tempDiff = (reading.temperature - lastTemp).abs if (tempDiff > threshold) { out.collect((reading.id, reading.temperature, tempDiff)) } this.lastTempState.update(reading.temperature)
} }

上面例子中的FlatMapFunction只能訪問當前處理的元素所包含的key所對應的狀態變數。

不同key對應的keyed state是相互隔離的。

  • 通過RuntimeContext註冊StateDescriptor。StateDescriptor以狀態state的名字和儲存的資料型別為引數。資料型別必須指定,因為Flink需要選擇合適的序列化器。
  • 在open()方法中建立state變數。注意複習之前的RichFunction相關知識。