1. 程式人生 > 實用技巧 >Flink例項(三十三):狀態管理(四)自定義鍵控狀態(三)MapState

Flink例項(三十三):狀態管理(四)自定義鍵控狀態(三)MapState

MapState[K, V]儲存Key-Value對。

  • MapState.get(key: K)
  • MapState.put(key: K, value: V)
  • MapState.contains(key: K)
  • MapState.remove(key: K)

例項一:

  去重計算應該是資料分析業務裡面常見的指標計算,例如網站一天的訪問使用者數、廣告的點選使用者數等等,離線計算是一個全量、一次性計算的過程通常可以通過distinct的方式得到去重結果,而實時計算是一種增量、長期計算過程,我們在面對不同的場景,例如資料量的大小、計算結果精準度要求等可以使用不同的方案。

  此篇介紹如何通過編碼方式實現精確去重,以一個實際場景為例:計算每個廣告每小時的點選使用者數,廣告點選日誌包含:廣告位ID、使用者裝置ID(idfa/imei/cookie)、點選時間。

實現步驟分析:

  • 為了當天的資料可重現,這裡選擇事件時間也就是廣告點選時間作為每小時的視窗期劃分
  • 資料分組使用廣告位ID+點選事件所屬的小時
  • 選擇processFunction來實現,一個狀態用來儲存資料、另外一個狀態用來儲存對應的資料量
  • 計算完成之後的資料清理,按照時間進度註冊定時器清理

廣告資料

case class AdData(id:Int,devId:String,time:Long)

分組資料

case class AdKey(id:Int,time:Long)

主流程

val env=StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val kafkaConfig
=new Properties() kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092") kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1") val consumer=new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema,kafkaConfig) val ds=env.addSource(consumer) .map(x=>{ val s
=x.split(",") AdData(s(0).toInt,s(1),s(2).toLong) } ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[AdData](Time.minutes(1)){ override def extractTimestamp(element: AdData): Long = element.time }) .keyBy(x=>{ val endTime= TimeWindow.getWindowStartWithOffset(x.time, 0, Time.hours(1).toMilliseconds)+Time.hours(1).toMilliseconds AdKey(x.id,endTime) })

  指定時間時間屬性,這裡設定允許1min的延時,可根據實際情況調整;
  時間的轉換選擇TimeWindow.getWindowStartWithOffset Flink在處理window中自帶的方法,使用起來很方便,第一個引數 表示資料時間,第二個引數offset偏移量,預設為0,正常視窗劃分都是整點方式,例如從0開始劃分,這個offset就是相對於0的偏移量,第三個引數表示視窗大小,得到的結果是資料時間所屬視窗的開始時間,這裡加上了視窗大小,使用結束時間與廣告位ID作為分組的Key。

去重邏輯
  自定義Distinct1ProcessFunction 繼承了KeyedProcessFunction, 方便起見使用輸出型別使用Void,這裡直接使用列印控制檯方式檢視結果,在實際中可輸出到下游做一個批量的處理然後在輸出;
  定義兩個狀態:MapState,key表示devId, value表示一個隨意的值只是為了標識,該狀態表示一個廣告位在某個小時的裝置資料,如果我們使用rocksdb作為statebackend, 那麼會將mapstate中key作為rocksdb中key的一部分,mapstate中value作為rocksdb中的value, rocksdb中value 大小是有上限的,這種方式可以減少rocksdb value的大小;

  另外一個ValueState,儲存當前MapState的資料量,是由於mapstate只能通過迭代方式獲得資料量大小,每次獲取都需要進行迭代,這種方式可以避免每次迭代。

class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey, AdData, Void] {

  var devIdState: MapState[String, Int] = _
  var devIdStateDesc: MapStateDescriptor[String, Int] = _
  var countState: ValueState[Long] = _
  var countStateDesc: ValueStateDescriptor[Long] = _
  
  override def open(parameters: Configuration): Unit = {
    devIdStateDesc = new MapStateDescriptor[String, Int]("devIdState", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int]))
    devIdState = getRuntimeContext.getMapState(devIdStateDesc)
    countStateDesc = new ValueStateDescriptor[Long]("countState", TypeInformation.of(classOf[Long]))
    countState = getRuntimeContext.getState(countStateDesc)
  }

  override def processElement(value: AdData, ctx: KeyedProcessFunction[AdKey, AdData, Void]#Context, out: Collector[Void]): Unit = {
    val currW=ctx.timerService().currentWatermark()
    if(ctx.getCurrentKey.time+1<=currW) {
        println("late data:" + value)
        return
      }

    val devId = value.devId
    devIdState.get(devId) match {
      case 1 => {
        //表示已經存在
      }

      case _ => {
        //表示不存在
        devIdState.put(devId, 1)
        val c = countState.value()
        countState.update(c + 1)
        //還需要註冊一個定時器
        ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)
      }
    }
    println(countState.value())
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[AdKey, AdData, Void]#OnTimerContext, out: Collector[Void]): Unit = {
    println(timestamp + " exec clean~~~")
    println(countState.value())
    devIdState.clear()
    countState.clear()
  }
}

資料清理通過註冊定時器方式ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)表示當watermark大於該小時結束時間+1就會執行清理動作,呼叫onTimer方法。

在處理邏輯裡面加了

val currW=ctx.timerService().currentWatermark()
if(ctx.getCurrentKey.time+1<=currW){
        println("late data:" + value)
        return
  }

主要考慮可能會存在滯後的資料比較嚴重,會影響之前的計算結果,做了一個類似window機制裡面的一個延時判斷,將延時的資料過濾掉,也可以使用OutputTag 單獨處理。