1. 程式人生 > 其它 >Flink 基石、Flink Time、事件時間、Watermark水位線

Flink 基石、Flink Time、事件時間、Watermark水位線

Flink 基石、Flink Time、事件時間、Watermark水位線

目錄

事件時間

程式碼示例

package com.shujia.flink.core

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object Demo5EventTIme {
  def main(args: Array[String]): Unit = {

    /*
    使用者id,事件時間
    001,1647676561000
    001,1647676562000
    001,1647676563000
    001,1647676565000
    001,1647676564000
    001,1647676566000
    001,1647676567000
    001,1647676568000
    001,1647676569000
    001,1647676570000
    001,1647676575000
     */

    /**
      * 使用事件時間劃分視窗
      * 1、設定事件模式為事件時間
      * 2、指定時間欄位
      */

    /**
      * 每隔5秒統計使用者出現的次數
      *
      */

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //這裡需要將並行度設定為1
    //因為這裡存在一個時間戳對齊的問題,多並行度的時候會對不齊
    //不會觸發事件時間的計算
    env.setParallelism(1)

    //設定時間模式
    //預設是處理時間
    //TimeCharacteristic.EventTime -- 事件時間
    //TimeCharacteristic.IngestionTime -- 接收時間
    //TimeCharacteristic.ProcessingTime -- 處理時間
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val eventDS: DataStream[(String, Long)] = linesDS.map(line => {
      val split: Array[String] = line.split(",")
      (split(0), split(1).toLong)
    })

    //設定時間欄位
    val assDS: DataStream[(String, Long)] = eventDS.assignAscendingTimestamps(_._2)

    /**
      * 事件時間視窗觸發條件
      * 1、視窗內有資料
      * 2、最新資料的事件時間大於等於視窗的結束資料的時間
      * 但是這樣會有一個問題,就是資料的事件時間是亂序的,這樣怎麼辦呢?
      */

    val countDS: DataStream[(String, Int)] = assDS
      .map(kv => (kv._1, 1))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5))
      .sum(1)

    countDS.print()

    env.execute()

  }
}

但是這樣會有一個問題,就是資料的事件時間是亂序的,這樣怎麼辦呢?

視窗如果被計算了,之後再來一條屬於這個視窗的資料會丟資料

Watermark

水位線

package com.shujia.flink.core

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object Demo5EventTIme {
  def main(args: Array[String]): Unit = {

    /*
    001,1647676561000
    001,1647676562000
    001,1647676563000
    001,1647676565000
    001,1647676564000
    001,1647676566000
    001,1647676567000
    001,1647676568000
    001,1647676569000
    001,1647676570000
    001,1647676575000
     */

    /**
      * 使用事件事件劃分視窗
      * 1、設定事件模式為事件時間
      * 2、指定時間欄位
      */

    /**
      * 每隔5秒統計使用者出現的次數
      *
      */

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    //設定時間模式
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val eventDS: DataStream[(String, Long)] = linesDS.map(line => {
      val split: Array[String] = line.split(",")
      (split(0), split(1).toLong)
    })

    //設定時間欄位,水位線預設等於最新資料的時間戳,水位線只增加不減少
    //    val assDS: DataStream[(String, Long)] = eventDS.assignAscendingTimestamps(_._2)

    //設定水位線和時間欄位
    val assDS: DataStream[(String, Long)] = eventDS.assignTimestampsAndWatermarks(
      //執行水位線前移的時間
      new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
        //指定時間戳欄位
        override def extractTimestamp(element: (String, Long)): Long = element._2
      }
    )

    /**
      * 事件時間視窗觸發條件
      * 1、視窗內有資料
      * 2、最新資料的時間大於等於視窗的結束資料
      *
      */

    val countDS: DataStream[(String, Int)] = assDS
      .map(kv => (kv._1, 1))
      .keyBy(_._1)
      .timeWindow(Time.seconds(5))
      .sum(1)

    countDS.print()

    env.execute()

  }
}

學習一個新框架,會看官網很重要