Flink中設定狀態的超時
阿新 • • 發佈:2022-02-07
在監控中儲存某個狀態值,但是過一段時間後需要將該值清理掉,防止對業務有影響或者堆積浪費儲存空間。
flink提供了狀態超時設定。
例項如下:
class MyFilter extends RichFilterFunction[JSONObject]{ var dateState: ValueState[String] = _ val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd") override def open(parameters: Configuration): Unit = { val valueStateDesc= new ValueStateDescriptor[String]("date-state",classOf[String]) //設定狀態的超時時間以及更新時間的策略 val stateTtlConfig = StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType(UpdateType.OnCreateAndWrite).build() valueStateDesc.enableTimeToLive(stateTtlConfig) dateState = getRuntimeContext.getState(valueStateDesc) } override def filter(value: JSONObject): Boolean= { val lastPageId = value.getJSONObject("page").getString("last_page_id") //當上一個頁面id為空時 if(null == lastPageId || lastPageId.length <= 0){ //取出狀態資料 val lastDate = dateState.value() //取出今天的日期 val currDate = sdf.format(value.getString("ts"))//判斷2個日期是否相同 if(!currDate.equals(lastDate)){ dateState.update(currDate) return true }else { return false } }else { false } } } }
這裡
StateTtlConfig.newBuilder(Time.hours(24)) //設定狀態儲存24小時
.setUpdateType(UpdateType.OnCreateAndWrite) //設定在建立或者更新記錄同時,更新超時時間。
.build()
作者:尤燈塔 出處:https://www.cnblogs.com/30go/ 本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段宣告,且在文章頁面明顯位置給出原文連線,否則保留追究法律責任的權利.