Flink例項(三十三):狀態管理(四)自定義鍵控狀態(三)MapState
MapState[K, V]儲存Key-Value對。
- MapState.get(key: K)
- MapState.put(key: K, value: V)
- MapState.contains(key: K)
- MapState.remove(key: K)
例項一:
去重計算應該是資料分析業務裡面常見的指標計算,例如網站一天的訪問使用者數、廣告的點選使用者數等等,離線計算是一個全量、一次性計算的過程通常可以通過distinct的方式得到去重結果,而實時計算是一種增量、長期計算過程,我們在面對不同的場景,例如資料量的大小、計算結果精準度要求等可以使用不同的方案。
此篇介紹如何通過編碼方式實現精確去重,以一個實際場景為例:計算每個廣告每小時的點選使用者數,廣告點選日誌包含:廣告位ID、使用者裝置ID(idfa/imei/cookie)、點選時間。
實現步驟分析:
- 為了當天的資料可重現,這裡選擇事件時間也就是廣告點選時間作為每小時的視窗期劃分
- 資料分組使用廣告位ID+點選事件所屬的小時
- 選擇processFunction來實現,一個狀態用來儲存資料、另外一個狀態用來儲存對應的資料量
- 計算完成之後的資料清理,按照時間進度註冊定時器清理
廣告資料
case class AdData(id:Int,devId:String,time:Long)
分組資料
case class AdKey(id:Int,time:Long)
主流程
val env=StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val kafkaConfig=new Properties() kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092") kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1") val consumer=new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema,kafkaConfig) val ds=env.addSource(consumer) .map(x=>{ val s=x.split(",") AdData(s(0).toInt,s(1),s(2).toLong) } ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[AdData](Time.minutes(1)){ override def extractTimestamp(element: AdData): Long = element.time }) .keyBy(x=>{ val endTime= TimeWindow.getWindowStartWithOffset(x.time, 0, Time.hours(1).toMilliseconds)+Time.hours(1).toMilliseconds AdKey(x.id,endTime) })
指定時間時間屬性,這裡設定允許1min的延時,可根據實際情況調整;
時間的轉換選擇TimeWindow.getWindowStartWithOffset Flink在處理window中自帶的方法,使用起來很方便,第一個引數 表示資料時間,第二個引數offset偏移量,預設為0,正常視窗劃分都是整點方式,例如從0開始劃分,這個offset就是相對於0的偏移量,第三個引數表示視窗大小,得到的結果是資料時間所屬視窗的開始時間,這裡加上了視窗大小,使用結束時間與廣告位ID作為分組的Key。
去重邏輯
自定義Distinct1ProcessFunction 繼承了KeyedProcessFunction, 方便起見使用輸出型別使用Void,這裡直接使用列印控制檯方式檢視結果,在實際中可輸出到下游做一個批量的處理然後在輸出;
定義兩個狀態:MapState,key表示devId, value表示一個隨意的值只是為了標識,該狀態表示一個廣告位在某個小時的裝置資料,如果我們使用rocksdb作為statebackend, 那麼會將mapstate中key作為rocksdb中key的一部分,mapstate中value作為rocksdb中的value, rocksdb中value 大小是有上限的,這種方式可以減少rocksdb value的大小;
另外一個ValueState,儲存當前MapState的資料量,是由於mapstate只能通過迭代方式獲得資料量大小,每次獲取都需要進行迭代,這種方式可以避免每次迭代。
class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey, AdData, Void] { var devIdState: MapState[String, Int] = _ var devIdStateDesc: MapStateDescriptor[String, Int] = _ var countState: ValueState[Long] = _ var countStateDesc: ValueStateDescriptor[Long] = _ override def open(parameters: Configuration): Unit = { devIdStateDesc = new MapStateDescriptor[String, Int]("devIdState", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int])) devIdState = getRuntimeContext.getMapState(devIdStateDesc) countStateDesc = new ValueStateDescriptor[Long]("countState", TypeInformation.of(classOf[Long])) countState = getRuntimeContext.getState(countStateDesc) } override def processElement(value: AdData, ctx: KeyedProcessFunction[AdKey, AdData, Void]#Context, out: Collector[Void]): Unit = { val currW=ctx.timerService().currentWatermark() if(ctx.getCurrentKey.time+1<=currW) { println("late data:" + value) return } val devId = value.devId devIdState.get(devId) match { case 1 => { //表示已經存在 } case _ => { //表示不存在 devIdState.put(devId, 1) val c = countState.value() countState.update(c + 1) //還需要註冊一個定時器 ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1) } } println(countState.value()) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[AdKey, AdData, Void]#OnTimerContext, out: Collector[Void]): Unit = { println(timestamp + " exec clean~~~") println(countState.value()) devIdState.clear() countState.clear() } }
資料清理通過註冊定時器方式ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)表示當watermark大於該小時結束時間+1就會執行清理動作,呼叫onTimer方法。
在處理邏輯裡面加了
val currW=ctx.timerService().currentWatermark() if(ctx.getCurrentKey.time+1<=currW){ println("late data:" + value) return }
主要考慮可能會存在滯後的資料比較嚴重,會影響之前的計算結果,做了一個類似window機制裡面的一個延時判斷,將延時的資料過濾掉,也可以使用OutputTag 單獨處理。