1. 程式人生 > 實用技巧 >Flink中State管理與恢復之CheckPoint原理及三種checkpoint使用方式對比

Flink中State管理與恢復之CheckPoint原理及三種checkpoint使用方式對比

CheckPoint

當程式出現問題需要恢復 Sate 資料的時候,只有程式提供支援才可以實現 State 的容錯。State 的容錯需要依靠 CheckPoint 機制,這樣才可以保證 Exactly-once 這種語義,但是注意,它只能保證 Flink 系統內的 Exactly-once,比如 Flink 內建支援的運算元。針對 Source和 Sink 元件,如果想要保證 Exactly-once 的話,則這些元件本身應支援這種語義。

1) CheckPoint 原理

Flink 中基於非同步輕量級的分散式快照技術提供了 Checkpoints 容錯機制,分散式快照可以將同一時間點 Task/Operator 的狀態資料全域性統一快照處理,包括前面提到的 KeyedState 和 Operator State。Flink 會在輸入的資料集上間隔性地生成 checkpoint barrier,通過柵欄(barrier)將間隔時間段內的資料劃分到相應的 checkpoint 中。每個運算元都會進行checkpoint 操作。

如下圖:

從檢查點(CheckPoint)恢復如下圖:

假如我們設定了三分鐘進行一次CheckPoint,儲存了上述所說的 chk-100 的CheckPoint狀態後,過了十秒鐘,offset已經消費到 (0,1100),pv統計結果變成了(app1,50080)(app2,10020),但是突然任務掛了,怎麼辦?
莫慌,其實很簡單,flink只需要從最近一次成功的CheckPoint儲存的offset(0,1000)處接著消費即可,當然pv值也要按照狀態裡的pv值(app1,50000)(app2,10000)進行累加,不能從(app1,50080)(app2,10020)處進行累加,因為 partition 0 offset消費到 1000時,pv統計結果為(app1,50000)(app2,10000)當然如果你想從offset (0,1100)pv(app1,50080)(app2,10020)這個狀態恢復,也是做不到的,因為那個時刻程式突然掛了,這個狀態根本沒有儲存下來。我們能做的最高效方式就是從最近一次成功的CheckPoint處恢復,也就是我一直所說的chk-100;

2) CheckPoint 引數和設定

預設情況下 Flink 不開啟檢查點的,使用者需要在程式中通過呼叫方法配置和開啟檢查點,另外還可以調整其他相關引數:

  • Checkpoint 開啟和時間間隔指定:

開啟檢查點並且指定檢查點時間間隔為 1000ms,根據實際情況自行選擇,如果狀態比較大,則建議適當增加該值。
streamEnv.enableCheckpointing(1000);

  • exactly-ance 和 at-least-once 語義選擇:

選擇 exactly-once 語義保證整個應用內端到端的資料一致性,這種情況比較適合於資料要求比較高,不允許出現丟資料或者資料重複,與此同時,Flink 的效能也相對較弱,而at-least-once 語義更適合於時廷和吞吐量要求非常高但對資料的一致性要求不高的場景。
如 下 通 過 setCheckpointingMode() 方 法 來 設 定 語 義 模 式 , 默 認 情 況 下 使 用 的 是exactly-once 模式。

streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//或者
streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
  • Checkpoint 超時時間:

超時時間指定了每次 Checkpoint 執行過程中的上限時間範圍,一旦 Checkpoint 執行時間超過該閾值,Flink 將會中斷 Checkpoint 過程,並按照超時處理。該指標可以通過setCheckpointTimeout 方法設定,預設為 10 分鐘。

streamEnv.getCheckpointConfig.setCheckpointTimeout(50000)
  • 檢查點之間最小時間間隔:

該引數主要目的是設定兩個 Checkpoint 之間的最小時間間隔,防止出現例如狀態資料過大而導致 Checkpoint 執行時間過長,從而導致 Checkpoint 積壓過多,最終 Flink 應用密集地觸發 Checkpoint 操作,會佔用了大量計算資源而影響到整個應用的效能。

streamEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(600)
  • 最大並行執行的檢查點數量:

通過 setMaxConcurrentCheckpoints()方法設定能夠最大同時執行的 Checkpoint 數量。在預設 情況下只 有一個檢查 點可以執行 ,根據用 戶指定的數 量可以同時 觸發多個Checkpoint,進而提升 Checkpoint 整體的效率。

streamEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

  • 是否刪除 Checkpoint 中儲存的資料:

設定為 RETAIN_ON_CANCELLATION:表示一旦 Flink 處理程式被 cancel 後,會保留CheckPoint 資料,以便根據實際需要恢復到指定的 CheckPoint。
設定為 DELETE_ON_CANCELLATION:表示一旦 Flink 處理程式被 cancel 後,會刪除CheckPoint 資料,只有 Job 執行失敗的時候才會儲存 CheckPoint

//刪除
streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
//保留64
streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  • TolerableCheckpointFailureNumber:

設定可以容忍的檢查的失敗數,超過這個數量則系統自動關閉和停止任務。

streamEnv.getCheckpointConfig.setTolerableCheckpointFailureNumber(1)

  

儲存機制 StateBackend(狀態後端)

預設情況下,State 會儲存在 TaskManager 的記憶體中,CheckPoint 會儲存在 JobManager的記憶體中。State 和 CheckPoint 的儲存位置取決於 StateBackend 的配置。Flink 一共提供了 3 種 StateBackend 。 包 括 基 於 內 存 的 MemoryStateBackend 、 基 於 文 件 系 統 的FsStateBackend,以及基於 RockDB 作為儲存介質的 RocksDBState-Backend。

1) MemoryStateBackend


基於記憶體的狀態管理具有非常快速和高效的特點,但也具有非常多的限制,最主要的就是記憶體的容量限制,一旦儲存的狀態資料過多就會導致系統記憶體溢位等問題,從而影響整個應用的正常執行。同時如果機器出現問題,整個主機記憶體中的狀態資料都會丟失,進而無法恢復任務中的狀態資料。因此從資料安全的角度建議使用者儘可能地避免在生產環境中使用MemoryStateBackend。

streamEnv.setStateBackend(new MemoryStateBackend(10*1024*1024))

  

2) FsStateBackend


和 MemoryStateBackend 有所不同,FsStateBackend 是基於檔案系統的一種狀態管理器,這裡的檔案系統可以是本地檔案系統,也可以是 HDFS 分散式檔案系統。FsStateBackend 更適合任務狀態非常大的情況,可以使checkpoint資料大量儲存於HDFS或本地檔案,例如應用中含有時間範圍非常長的視窗計算,或 Key/valueState 狀態資料量非常大的場景。

缺點:跟MemoryStateBackend一樣,記憶體中儲存的狀態資料不宜過大

streamEnv.setStateBackend(new FsStateBackend("hdfs://mycluster/checkpoint/cp1"))

3) RocksDBStateBackend

RocksDBStateBackend 是 Flink 中內建的第三方狀態管理器,和前面的狀態管理器不同,RocksDBStateBackend 需要單獨引入相關的依賴包到工程中。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
    <version>1.11.2</version>
</dependency>

RocksDBStateBackend 採用非同步的方式進行狀態資料的 Snapshot,任務中的狀態資料首先被寫入本地 RockDB 中,這樣在 RockDB 僅會儲存正在進行計算的熱資料,而需要進行CheckPoint 的時候,會把本地的資料直接複製到遠端的 FileSystem 中。RocksDB 同時在記憶體及磁碟中儲存資料
與 FsStateBackend 相比,RocksDBStateBackend 在效能上要比 FsStateBackend 高一些,65主要是因為藉助於 RocksDB 在本地儲存了最新熱資料,然後通過非同步的方式再同步到檔案系統中,但 RocksDBStateBackend 和 MemoryStateBackend 相比效能就會較弱一些。RocksDB克服了 State 受記憶體限制的缺點,同時又能夠持久化到遠端檔案系統中,推薦在生產中使用。

streamEnv.setStateBackend(new RocksDBStateBackend("hdfs://mycluster/checkpoint/cp2"))

4) 全域性配置 StateBackend

以 上 的 代 碼 都 是 單 job 配 置 狀 態 後 端 , 也 可 以 全 局 配 置 狀 態 後 端 , 需 要 修 改flink-conf.yaml 配置檔案:

state.backend: filesystem
其中:
filesystem 表示使用 FsStateBackend,
jobmanager 表示使用 MemoryStateBackend
rocksdb 表示使用 RocksDBStateBackend。
state.checkpoints.dir: hdfs://hadoop101:9000/checkpoints

預設情況下,如果設定了 CheckPoint 選項,則 Flink 只保留最近成功生成的 1 個CheckPoint,而當 Flink 程式失敗時,可以通過最近的 CheckPoint 來進行恢復。但是,如果希望保留多個 CheckPoint,並能夠根據實際需要選擇其中一個進行恢復,就會更加靈活。
新增如下配置,指定最多可以儲存的 CheckPoint 的個數。

state.checkpoints.num-retained: 2