1. 程式人生 > 其它 >flink 自定義ProcessFunction方法

flink 自定義ProcessFunction方法

import it.bigdata.flink.study.SensorReding
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object ProcessFunctionTest {
  def main(args: Array[String]): Unit 
= { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream = env.socketTextStream("10.18.35.155", 777) val dataStream = inputStream.map(data => { val agg = data.split(",") SensorReding(agg(0), agg(1).toLong, agg(2).toDouble) })
// .keyBy(_.id) // .process(new MykeyedProcessFunction) val warningSteam = dataStream .keyBy(_.id) .process(new TempIncreWaring(10000L)) warningSteam.print() env.execute("process function test") } } class TempIncreWaring(interval: Long) extends KeyedProcessFunction[String,SensorReding,String]{
//定義狀態:儲存上一個溫度值進行比較,儲存註冊定時器的時間戳用於刪除 lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp", classOf[Double])) lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time-ts", classOf[Long])) override def processElement(value: SensorReding, ctx: KeyedProcessFunction[String, SensorReding, String]#Context, out: Collector[String]): Unit = { //先取出狀態 val lastTemp = lastTempState.value() val timerTs = timerTsState.value() //更新溫度 lastTempState.update(value.temperature) //當前溫度值和上次溫度進行比較 if(value.temperature>lastTemp && timerTs == 0){ //如果溫度上升,且沒有定時器,那麼註冊當前資料時間10s之後的定時器 val ts = ctx.timerService().currentProcessingTime() + interval ctx.timerService().registerProcessingTimeTimer(ts) //設定10s之後的定時器 timerTsState.update(ts) //更新狀態 }else if (value.temperature<lastTemp){ //如果溫度下降,那麼刪除定時器 ctx.timerService().deleteProcessingTimeTimer(timerTs) timerTsState.clear() //清空狀態 } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReding, String]#OnTimerContext, out: Collector[String]): Unit = { out.collect("感測器"+ctx.getCurrentKey+"的溫度,連續"+interval/1000+"秒,連續上升") timerTsState.clear() } } //keyedProcessFunction功能測試 class MykeyedProcessFunction extends KeyedProcessFunction[String,SensorReding,String]{ var myState: ValueState[Int]= _ //生命週期 override def open(parameters: Configuration): Unit = { myState = getRuntimeContext.getState(new ValueStateDescriptor[Int]("mystate", classOf[Int])) } override def processElement(value: SensorReding, ctx: KeyedProcessFunction[String, SensorReding, String]#Context, out: Collector[String]): Unit ={ ctx.getCurrentKey //當前key ctx.timestamp() //當前時間戳 // ctx.output() //側輸出流 ctx.timerService().currentWatermark() //當前事件事件 ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 60000L) //註冊定時器(當前事件之後60秒的定時器) ctx.timerService().deleteEventTimeTimer(ctx.timestamp() + 60000L) //刪除定時器 } //定時器之後需要做的操作 override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReding, String]#OnTimerContext, out: Collector[String]): Unit = super.onTimer(timestamp, ctx, out) }

資料

sensor_1,1547718101,35.8
sensor_1,1547718102,22.2
sensor_1,1547718101,55.3
sensor_1,1547718102,24.1
sensor_1,1547718103,57
sensor_1,1547718103,58
sensor_1,1547718103,59
sensor_6,1547718101,15.4
sensor_7,1547718102,6.7
sensor_10,1547718205,38.1