1. 程式人生 > 其它 >Flink中設定狀態的超時

Flink中設定狀態的超時

在監控中儲存某個狀態值,但是過一段時間後需要將該值清理掉,防止對業務有影響或者堆積浪費儲存空間。

flink提供了狀態超時設定。

例項如下:

  class MyFilter extends RichFilterFunction[JSONObject]{

    var dateState: ValueState[String] = _
    val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")

    override def open(parameters: Configuration): Unit = {
      val valueStateDesc 
= new ValueStateDescriptor[String]("date-state",classOf[String]) //設定狀態的超時時間以及更新時間的策略 val stateTtlConfig = StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType(UpdateType.OnCreateAndWrite).build() valueStateDesc.enableTimeToLive(stateTtlConfig) dateState = getRuntimeContext.getState(valueStateDesc) } override def filter(value: JSONObject): Boolean
= { val lastPageId = value.getJSONObject("page").getString("last_page_id") //當上一個頁面id為空時 if(null == lastPageId || lastPageId.length <= 0){ //取出狀態資料 val lastDate = dateState.value() //取出今天的日期 val currDate = sdf.format(value.getString("ts"))
//判斷2個日期是否相同 if(!currDate.equals(lastDate)){ dateState.update(currDate) return true }else { return false } }else { false } } } }

這裡

StateTtlConfig.newBuilder(Time.hours(24)) //設定狀態儲存24小時

.setUpdateType(UpdateType.OnCreateAndWrite) //設定在建立或者更新記錄同時,更新超時時間。

.build()

作者:尤燈塔 出處:https://www.cnblogs.com/30go/ 本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段宣告,且在文章頁面明顯位置給出原文連線,否則保留追究法律責任的權利.