1. 程式人生 > 實用技巧 >Flink例項(六十三): Flink檢查點/儲存點提交

Flink例項(六十三): Flink檢查點/儲存點提交

https://blog.csdn.net/qq_33982605/article/details/106075809

目錄

檢查點生成多個

  Flink的檢查點預設是生成一個,想要生成多個可以在conf/flink-conf.yaml中新增如下的引數,在這裡我設定的是讓其生成2個:state.checkpoints.num-retained: 2

如何通過檢查點重新提交?

  儲存點和檢查點內部的生成演算法是一致的,工作方式也一致,但儲存點相比較檢查點有什麼不同呢?

  儲存點與檢查點有什麼不同?

  1. 生成邏輯不同
    a) 檢查點:通過程式碼進行生成
    b) 儲存點:由使用者通過flink命令列或者web控制檯進行手動觸發
  2. 儲存的資訊不同
    儲存點相比較檢查點來說儲存了更為詳細的一些元資料資訊。

檢查點在什麼情況下觸發?
  例如我在另一篇博文中所描述的“重啟策略”中的例子,檢查點在作業意外失敗後會自動重啟,並能夠從儲存的檢查點路徑中自動恢復狀態,且不影響作業邏輯的準確性。

Q:由於作業重啟失敗,程式退出,我此時修改完BUG後,想要讓我的程式接著當初失敗的地方重新執行,那麼我應如何啟動程式呢?

A:讀取失敗的檢查點,然後依託它重新執行即可
sudo
-u hdfs /myflink/flink-1.7.2/bin/flink run -s
hdfs://master:8020/flink/checkpoint0/61c58bf29cbcabb8c2e3146ff4eb24b9/chk-15
-m yarn-cluster
-c flink.ceshi /opt/flink_path/sbt-solr-assembly.jar
(-s後面表示的是我檢查點的路徑,該命令代表的是從我當初檢查點處“繼續”執行)


此時可以看到我該目錄下面有2個檢查點了,
  “0b3f0bcca83ed8c7176a8eed06296c1a”
該檢查點是依託
  “61c58bf29cbcabb8c2e3146ff4eb24b9”
檢查點的狀態生成的新檢查點

儲存點在什麼情況下觸發?

  儲存點側重的是維護,即flink作業需要在人工干預的情況下進行重啟或升級,維護完畢後再從儲存點恢復到升級後的狀態。

不取消當前應用時建立儲存點

sudo -u hdfs /myflink/flink-1.7.2/bin/flink
savepoint 0b3f0bcca83ed8c7176a8eed06296c1a
(注意這裡的與前面的檢查點要保持一致,因為我要對A程式的檢查點進行手動建立儲存點,而A程式對應的檢查點則是該檢查點)


-yid application_1588754764898_0023
(針對flink on yarn模式需要指定jobID)
hdfs://master:8020/flink/checkpoint03/s1

取消當前flink應用之前生成儲存點

sudo -u hdfs /myflink/flink-1.7.2/bin/flink cancel
-s hdfs://master:8020/flink/checkpoint03/s2
b5745a9e4fe87c403a05e7fc73cacee7
-yid application_1588995876125_0052

從儲存點處啟動程式

sudo -u hdfs /myflink/flink-1.7.2/bin/flink run
-s
hdfs://master:8020/flink/checkpoint03/s1/savepoint-0b3f0b-ed13f369aadc
-m yarn-cluster
-c flink.ceshi /opt/flink_path/sbt-solr-assembly.jar

Q:在我殺死A程式到我從儲存點啟動A程式的這個過程當中,我的kafka資料沒有斷過,那麼此時當我從儲存點重新啟動程式時,我的資料會丟失嗎?

A:答案是不會,因為當你在生成儲存點時,是通過檢查點進行生成的,而檢查點中是有Kafka的偏移量的,因此你kafka的資料不會丟失