1. 程式人生 > 實用技巧 >Flink 之Window(視窗)

Flink 之Window(視窗)

知識點:

注意 window () 方法必須在 keyBy 之後才能用

window 型別 
    • 時間視窗(Time Window) 
        滾動時間視窗(tumbling window):將資料依據固定的視窗長度對資料進行切分 ;時間對齊,視窗長度固定,沒有重疊
            適用場景:適合做 BI 統計等(做每個時間段的聚合計算)
        滑動時間視窗 (sliding window):滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗 長度和滑動間隔組成 ; 視窗長度固定,可以有重疊
            適用場景:對最近一個時間段內的統計(求某介面最近 5min 的失敗率來決定是 否要報警)。
        會話
視窗 (session window):由一系列事件組合一個指定時間長度的 timeout 間隙組成,也就是 一段時間沒有接收到新資料就會生成新的視窗 ;時間無對齊 • 計數視窗(Count Window) 滾動計數視窗 滑動計數視窗 window function :定義了要對視窗中收集的資料做的計算操作 • 可以分為兩類 增量聚合函式(incremental aggregation functions) : 每條資料到來就進行計算,保持一個簡單的狀態 ; ReduceFunction, AggregateFunction

     全視窗函式(full window functions) : 先把視窗所有資料收集起來,等到計算的時候會遍歷所有資料 ; ProcessWindowFunction Window其他Api .trigger() —— 觸發器 : 定義 window 什麼時候關閉,觸發計算並輸出結果 .evictor() —— 移除器 : 定義移除某些資料的邏輯 .allowedLateness() —— 允許處理遲到的資料 .sideOutputLateData() —— 將遲到的資料放入側輸出流 .getSideOutput() —— 獲取側輸出流

1、程式碼案例1

package window

import com.yangwj.api.SensorReading
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
 * @author yangwj
 * @date 2021/1/7 21:45
 * @version 1.0
 */
object WindowTest {
  /**
   * 視窗:分為時間視窗和計數視窗
   * @param args
   */
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    val input: DataStream[String] = env.readTextFile(inputFile)

    val dataStream = input.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    dataStream
      .map(data => (data.id,data.temperature))
      .keyBy(_._1) //按照二元組的第一個元素分組
     // .window(TumblingEventTimeWindows.of(Time.seconds(15))) //定義滾動時間視窗
//      .window(SlidingEventTimeWindows.of(Time.minutes(5),Time.seconds(5)))//滑動視窗
//      .window(EventTimeSessionWindows.withGap(Time.seconds(5)))// 會話視窗
      .timeWindow(Time.seconds(15))//定義滾動時間視窗
//      .timeWindow(Time.minutes(5),Time.seconds(5))//滑動視窗
      //.countWindow(5)//計數視窗
      .reduce((r1,r2) => (r1._1,r1._2.min(r2._2))).print("test")
    
    env.execute(" window test")
  }
}

2、程式碼案例2

package window

import com.yangwj.api.SensorReading
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time

import scala.util.Random

/**
 * @author yangwj
 * @date 2021/1/7 21:45
 * @version 1.0
 */
object WindowTest1 {
  /**
   * 視窗:分為時間視窗和計數視窗
   * @param args
   */
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[SensorReading] = env.addSource(new MySensorSource)
    //每15秒統計一次溫度最小值
    val value: DataStream[(String, Double, Long)] = dataStream
      .map(data => (data.id, data.temperature, data.timestamp))
      .keyBy(_._1) //按照二元組的第一個元素分組
      // .window(TumblingEventTimeWindows.of(Time.seconds(15))) //定義滾動時間視窗
      //      .window(SlidingEventTimeWindows.of(Time.minutes(5),Time.seconds(5)))//滑動視窗
      //      .window(EventTimeSessionWindows.withGap(Time.seconds(5)))// 會話視窗
      .timeWindow(Time.seconds(15)) //定義滾動時間視窗
      //      .timeWindow(Time.minutes(5),Time.seconds(5))//滑動視窗
      //.countWindow(5)//計數視窗
      //      .minBy(1)
      .reduce((curs, newd) => (curs._1, curs._2.min(newd._2), newd._3))

    value.print("windows")
    env.execute("reduce test")

  }
}
//自定義資料來源
class MySensorSource extends SourceFunction[SensorReading]{

  //定義一個標識位,用來表示資料來源是否正常執行發出資料
  var running :Boolean = true
  //sourceContext 傳送資料
  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
    //定義無線迴圈,不斷產生資料,除非被cancel
    val rand = new Random()
    var curTemp= 1.to(4).map(i => ("sensor" + i, rand.nextDouble() * 100))

    while (running){
      curTemp = curTemp.map(data =>(data._1,data._2+rand.nextGaussian()))

      val curTime = System.currentTimeMillis()
      println("輸入值:"+curTemp+",時間為:"+curTime)
      curTemp.foreach(data => sourceContext.collect(SensorReading(data._1,curTime,data._2)))
      Thread.sleep(3000)
    }
  }

  override def cancel(): Unit = false
}