1. 程式人生 > 其它 >flink新增水位線

flink新增水位線

flink1.11新增水位線

object UpdateWindowResultWithLateEvent {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val stream = env
      .socketTextStream(
"db2", 9999, '\n') .map(r => { val arr = r.split(" ") (arr(0), arr(1).toLong * 1000L) }) .assignTimestampsAndWatermarks( WatermarkStrategy .forBoundedOutOfOrderness[(String, Long)](Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = element._2 }) ) .keyBy(r => r._1) .timeWindow(Time.seconds(5)) .allowedLateness(Time.seconds(5)) .process(new CountWindow) stream.print() env.execute() } class
CountWindow extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] { override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = { // note that here is window state!!! only for current key and current window val isUpdate = context.windowState.getState( new ValueStateDescriptor[Boolean]("is-update", Types.of[Boolean]) ) if (!isUpdate.value()) { out.collect("first calculate window result!!!!") isUpdate.update(true) } else { out.collect("update window result!!!!") } } } }