Flink 之Window(視窗)
阿新 • • 發佈:2021-01-09
知識點:
注意 window () 方法必須在 keyBy 之後才能用 window 型別 • 時間視窗(Time Window) 滾動時間視窗(tumbling window):將資料依據固定的視窗長度對資料進行切分 ;時間對齊,視窗長度固定,沒有重疊 適用場景:適合做 BI 統計等(做每個時間段的聚合計算) 滑動時間視窗 (sliding window):滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗 長度和滑動間隔組成 ; 視窗長度固定,可以有重疊 適用場景:對最近一個時間段內的統計(求某介面最近 5min 的失敗率來決定是 否要報警)。 會話視窗 (session window):由一系列事件組合一個指定時間長度的 timeout 間隙組成,也就是 一段時間沒有接收到新資料就會生成新的視窗 ;時間無對齊 • 計數視窗(Count Window) 滾動計數視窗 滑動計數視窗 window function :定義了要對視窗中收集的資料做的計算操作 • 可以分為兩類 增量聚合函式(incremental aggregation functions) : 每條資料到來就進行計算,保持一個簡單的狀態 ; ReduceFunction, AggregateFunction
全視窗函式(full window functions) : 先把視窗所有資料收集起來,等到計算的時候會遍歷所有資料 ; ProcessWindowFunction Window其他Api .trigger() —— 觸發器 : 定義 window 什麼時候關閉,觸發計算並輸出結果 .evictor() —— 移除器 : 定義移除某些資料的邏輯 .allowedLateness() —— 允許處理遲到的資料 .sideOutputLateData() —— 將遲到的資料放入側輸出流 .getSideOutput() —— 獲取側輸出流
1、程式碼案例1
package window import com.yangwj.api.SensorReading import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time /** * @author yangwj * @date 2021/1/7 21:45 * @version 1.0 */ object WindowTest { /** * 視窗:分為時間視窗和計數視窗 * @param args */ def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt" val input: DataStream[String] = env.readTextFile(inputFile) val dataStream = input.map(data => { val arr: Array[String] = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }) dataStream .map(data => (data.id,data.temperature)) .keyBy(_._1) //按照二元組的第一個元素分組 // .window(TumblingEventTimeWindows.of(Time.seconds(15))) //定義滾動時間視窗 // .window(SlidingEventTimeWindows.of(Time.minutes(5),Time.seconds(5)))//滑動視窗 // .window(EventTimeSessionWindows.withGap(Time.seconds(5)))// 會話視窗 .timeWindow(Time.seconds(15))//定義滾動時間視窗 // .timeWindow(Time.minutes(5),Time.seconds(5))//滑動視窗 //.countWindow(5)//計數視窗 .reduce((r1,r2) => (r1._1,r1._2.min(r2._2))).print("test") env.execute(" window test") } }
2、程式碼案例2
package window import com.yangwj.api.SensorReading import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time import scala.util.Random /** * @author yangwj * @date 2021/1/7 21:45 * @version 1.0 */ object WindowTest1 { /** * 視窗:分為時間視窗和計數視窗 * @param args */ def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataStream: DataStream[SensorReading] = env.addSource(new MySensorSource) //每15秒統計一次溫度最小值 val value: DataStream[(String, Double, Long)] = dataStream .map(data => (data.id, data.temperature, data.timestamp)) .keyBy(_._1) //按照二元組的第一個元素分組 // .window(TumblingEventTimeWindows.of(Time.seconds(15))) //定義滾動時間視窗 // .window(SlidingEventTimeWindows.of(Time.minutes(5),Time.seconds(5)))//滑動視窗 // .window(EventTimeSessionWindows.withGap(Time.seconds(5)))// 會話視窗 .timeWindow(Time.seconds(15)) //定義滾動時間視窗 // .timeWindow(Time.minutes(5),Time.seconds(5))//滑動視窗 //.countWindow(5)//計數視窗 // .minBy(1) .reduce((curs, newd) => (curs._1, curs._2.min(newd._2), newd._3)) value.print("windows") env.execute("reduce test") } } //自定義資料來源 class MySensorSource extends SourceFunction[SensorReading]{ //定義一個標識位,用來表示資料來源是否正常執行發出資料 var running :Boolean = true //sourceContext 傳送資料 override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = { //定義無線迴圈,不斷產生資料,除非被cancel val rand = new Random() var curTemp= 1.to(4).map(i => ("sensor" + i, rand.nextDouble() * 100)) while (running){ curTemp = curTemp.map(data =>(data._1,data._2+rand.nextGaussian())) val curTime = System.currentTimeMillis() println("輸入值:"+curTemp+",時間為:"+curTime) curTemp.foreach(data => sourceContext.collect(SensorReading(data._1,curTime,data._2))) Thread.sleep(3000) } } override def cancel(): Unit = false }