Flink 基石、Flink Time、事件時間、Watermark水位線
阿新 • • 發佈:2022-03-20
Flink 基石、Flink Time、事件時間、Watermark水位線
目錄Flink 基石
Flink Time
事件時間
程式碼示例
package com.shujia.flink.core import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object Demo5EventTIme { def main(args: Array[String]): Unit = { /* 使用者id,事件時間 001,1647676561000 001,1647676562000 001,1647676563000 001,1647676565000 001,1647676564000 001,1647676566000 001,1647676567000 001,1647676568000 001,1647676569000 001,1647676570000 001,1647676575000 */ /** * 使用事件時間劃分視窗 * 1、設定事件模式為事件時間 * 2、指定時間欄位 */ /** * 每隔5秒統計使用者出現的次數 * */ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //這裡需要將並行度設定為1 //因為這裡存在一個時間戳對齊的問題,多並行度的時候會對不齊 //不會觸發事件時間的計算 env.setParallelism(1) //設定時間模式 //預設是處理時間 //TimeCharacteristic.EventTime -- 事件時間 //TimeCharacteristic.IngestionTime -- 接收時間 //TimeCharacteristic.ProcessingTime -- 處理時間 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val linesDS: DataStream[String] = env.socketTextStream("master", 8888) val eventDS: DataStream[(String, Long)] = linesDS.map(line => { val split: Array[String] = line.split(",") (split(0), split(1).toLong) }) //設定時間欄位 val assDS: DataStream[(String, Long)] = eventDS.assignAscendingTimestamps(_._2) /** * 事件時間視窗觸發條件 * 1、視窗內有資料 * 2、最新資料的事件時間大於等於視窗的結束資料的時間 * 但是這樣會有一個問題,就是資料的事件時間是亂序的,這樣怎麼辦呢? */ val countDS: DataStream[(String, Int)] = assDS .map(kv => (kv._1, 1)) .keyBy(_._1) .timeWindow(Time.seconds(5)) .sum(1) countDS.print() env.execute() } }
但是這樣會有一個問題,就是資料的事件時間是亂序的,這樣怎麼辦呢?
視窗如果被計算了,之後再來一條屬於這個視窗的資料會丟資料
Watermark
水位線
package com.shujia.flink.core import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time object Demo5EventTIme { def main(args: Array[String]): Unit = { /* 001,1647676561000 001,1647676562000 001,1647676563000 001,1647676565000 001,1647676564000 001,1647676566000 001,1647676567000 001,1647676568000 001,1647676569000 001,1647676570000 001,1647676575000 */ /** * 使用事件事件劃分視窗 * 1、設定事件模式為事件時間 * 2、指定時間欄位 */ /** * 每隔5秒統計使用者出現的次數 * */ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //設定時間模式 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val linesDS: DataStream[String] = env.socketTextStream("master", 8888) val eventDS: DataStream[(String, Long)] = linesDS.map(line => { val split: Array[String] = line.split(",") (split(0), split(1).toLong) }) //設定時間欄位,水位線預設等於最新資料的時間戳,水位線只增加不減少 // val assDS: DataStream[(String, Long)] = eventDS.assignAscendingTimestamps(_._2) //設定水位線和時間欄位 val assDS: DataStream[(String, Long)] = eventDS.assignTimestampsAndWatermarks( //執行水位線前移的時間 new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) { //指定時間戳欄位 override def extractTimestamp(element: (String, Long)): Long = element._2 } ) /** * 事件時間視窗觸發條件 * 1、視窗內有資料 * 2、最新資料的時間大於等於視窗的結束資料 * */ val countDS: DataStream[(String, Int)] = assDS .map(kv => (kv._1, 1)) .keyBy(_._1) .timeWindow(Time.seconds(5)) .sum(1) countDS.print() env.execute() } }
學習一個新框架,會看官網很重要