【Flink】Checkpoint機制
阿新 • • 發佈:2020-12-29
技術標籤:Flink
//獲得Flink環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 60s 做一次 checkpoint
env.enableCheckpointing(60000);
// 高階配置:
// checkpoint 語義設定為 EXACTLY_ONCE,這是預設語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 兩次 checkpoint 的間隔時間至少為 1 s,預設是 0,立即進行下一次 checkpoint
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
// checkpoint 必須在 60s 內結束,否則被丟棄,預設是 10 分鐘
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一時間只能允許有一個 checkpoint
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 最多允許 checkpoint 失敗 3 次
env.getCheckpointConfig().setTolerableCheckpointFailureNumber (3);
// 當 Flink 任務取消時,保留外部儲存的 checkpoint 資訊
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 當有較新的 Savepoint 時,作業也會從 Checkpoint 處恢復
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
// 允許實驗性的功能:非對齊的 checkpoint,以提升效能
env.getCheckpointConfig ().enableUnalignedCheckpoints();
- 初始化檢查點同時設定檢查點間隔
env.enableCheckpointing(60000);
等同於
CheckpointConfig config = env.getCheckpointConfig();
Flink原始碼如下:
- setCheckpointTimeout,checkpoint 超時時間,預設是 10 分鐘超時,超過了超時時間就會被丟棄;
- setCheckpointingMode,設定 checkpoint 語義,可以設定為 EXACTLY_ONCE,表示既不重複消費也不丟資料;AT_LEAST_ONCE,表示至少消費一次,可能會重複消費;
- setMinPauseBetweenCheckpoints,兩次 checkpoint 之間的間隔時間。假如設定每分鐘進行一次 checkpoint,兩次 checkpoint 間隔時間為 30s。假設某一次 checkpoint 耗時 40s,那麼理論上20s 後就要進行一次 checkpoint,但是設定了兩次 checkpoint 之間的間隔時間為 30s,所以是 30s 之後才會進行 checkpoint。另外,如果配置了該引數,那麼同時進行的 checkpoint 數量只能為 1;
- enableExternalizedCheckpoints,Flink 任務取消後,外部 checkpoint 資訊是否被清理。
- DELETE_ON_CANCELLATION,任務取消後,所有的 checkpoint 都將會被清理。只有在任務失敗後,才會被保留;
- RETAIN_ON_CANCELLATION,任務取消後,所有的 checkpoint 都將會被保留,需要手工清理。
- setPreferCheckpointForRecovery,恢復任務時,是否從最近一個比較新的 savepoint 處恢復,預設是 false;
- enableUnalignedCheckpoints,是否開啟試驗性的非對齊的 checkpoint,可以在反壓情況下極大減少 checkpoint 的次數;
Flink 1.11 對 Checkpoint 的優化:
在以前,在進行對齊的過程中,運算元是不會再接著處理資料了,一定要等到對齊動作完成之後,才能繼續對齊
在 Flink 1.11 版本中,引入了一個 Unaligned Checkpointing 的模組,主要功能是,在 barrier 到達之後,不必等待所有的輸入流的 barrier,而是繼續處理資料