1. 程式人生 > 其它 >【Flink】Checkpoint機制

【Flink】Checkpoint機制

技術標籤:Flink

//獲得Flink環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每 60s 做一次 checkpoint
env.enableCheckpointing(60000);

// 高階配置:

// checkpoint 語義設定為 EXACTLY_ONCE,這是預設語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 兩次 checkpoint 的間隔時間至少為 1 s,預設是 0,立即進行下一次 checkpoint
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // checkpoint 必須在 60s 內結束,否則被丟棄,預設是 10 分鐘 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一時間只能允許有一個 checkpoint env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 最多允許 checkpoint 失敗 3 次 env.getCheckpointConfig().setTolerableCheckpointFailureNumber
(3); // 當 Flink 任務取消時,保留外部儲存的 checkpoint 資訊 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 當有較新的 Savepoint 時,作業也會從 Checkpoint 處恢復 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 允許實驗性的功能:非對齊的 checkpoint,以提升效能 env.getCheckpointConfig
().enableUnalignedCheckpoints();
  1. 初始化檢查點同時設定檢查點間隔
    env.enableCheckpointing(60000);
    等同於
    CheckpointConfig config = env.getCheckpointConfig();
    Flink原始碼如下:
    在這裡插入圖片描述在這裡插入圖片描述
  2. setCheckpointTimeout,checkpoint 超時時間,預設是 10 分鐘超時,超過了超時時間就會被丟棄;
  3. setCheckpointingMode,設定 checkpoint 語義,可以設定為 EXACTLY_ONCE,表示既不重複消費也不丟資料;AT_LEAST_ONCE,表示至少消費一次,可能會重複消費;
  4. setMinPauseBetweenCheckpoints,兩次 checkpoint 之間的間隔時間。假如設定每分鐘進行一次 checkpoint,兩次 checkpoint 間隔時間為 30s。假設某一次 checkpoint 耗時 40s,那麼理論上20s 後就要進行一次 checkpoint,但是設定了兩次 checkpoint 之間的間隔時間為 30s,所以是 30s 之後才會進行 checkpoint。另外,如果配置了該引數,那麼同時進行的 checkpoint 數量只能為 1;
  5. enableExternalizedCheckpoints,Flink 任務取消後,外部 checkpoint 資訊是否被清理。
  6. DELETE_ON_CANCELLATION,任務取消後,所有的 checkpoint 都將會被清理。只有在任務失敗後,才會被保留;
  7. RETAIN_ON_CANCELLATION,任務取消後,所有的 checkpoint 都將會被保留,需要手工清理。
  8. setPreferCheckpointForRecovery,恢復任務時,是否從最近一個比較新的 savepoint 處恢復,預設是 false;
  9. enableUnalignedCheckpoints,是否開啟試驗性的非對齊的 checkpoint,可以在反壓情況下極大減少 checkpoint 的次數;

Flink 1.11 對 Checkpoint 的優化:
在以前,在進行對齊的過程中,運算元是不會再接著處理資料了,一定要等到對齊動作完成之後,才能繼續對齊
在 Flink 1.11 版本中,引入了一個 Unaligned Checkpointing 的模組,主要功能是,在 barrier 到達之後,不必等待所有的輸入流的 barrier,而是繼續處理資料