Flink例項(三十一):狀態管理(二)自定義鍵控狀態(一)ValueState
阿新 • • 發佈:2020-10-10
ValueState[T]儲存單個的值,值的型別為T。
- get操作: ValueState.value()
- set操作: ValueState.update(value: T)
例項一
scala version
val sensorData: DataStream[SensorReading] = ... val keyedData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id) val alerts: DataStream[(String, Double, Double)] = keyedData .flatMap(new TemperatureAlertFunction(1.7)) class TemperatureAlertFunction(val threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] { private var lastTempState: ValueState[Double] = _ override def open(parameters: Configuration): Unit = { val lastTempDescriptor = new ValueStateDescriptor[Double]( "lastTemp", classOf[Double])lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor) } override def flatMap( reading: SensorReading, out: Collector[(String, Double, Double)] ): Unit = { val lastTemp = lastTempState.value() val tempDiff = (reading.temperature - lastTemp).abs if (tempDiff > threshold) { out.collect((reading.id, reading.temperature, tempDiff)) } this.lastTempState.update(reading.temperature)} }
上面例子中的FlatMapFunction只能訪問當前處理的元素所包含的key所對應的狀態變數。
不同key對應的keyed state是相互隔離的。
- 通過RuntimeContext註冊StateDescriptor。StateDescriptor以狀態state的名字和儲存的資料型別為引數。資料型別必須指定,因為Flink需要選擇合適的序列化器。
- 在open()方法中建立state變數。注意複習之前的RichFunction相關知識。