Flink例項(六十三): Flink檢查點/儲存點提交
https://blog.csdn.net/qq_33982605/article/details/106075809
目錄
檢查點生成多個
Flink的檢查點預設是生成一個,想要生成多個可以在conf/flink-conf.yaml中新增如下的引數,在這裡我設定的是讓其生成2個:state.checkpoints.num-retained: 2
如何通過檢查點重新提交?
儲存點和檢查點內部的生成演算法是一致的,工作方式也一致,但儲存點相比較檢查點有什麼不同呢?
儲存點與檢查點有什麼不同?
- 生成邏輯不同
a) 檢查點:通過程式碼進行生成
b) 儲存點:由使用者通過flink命令列或者web控制檯進行手動觸發 - 儲存的資訊不同
儲存點相比較檢查點來說儲存了更為詳細的一些元資料資訊。
檢查點在什麼情況下觸發?
例如我在另一篇博文中所描述的“重啟策略”中的例子,檢查點在作業意外失敗後會自動重啟,並能夠從儲存的檢查點路徑中自動恢復狀態,且不影響作業邏輯的準確性。
Q:由於作業重啟失敗,程式退出,我此時修改完BUG後,想要讓我的程式接著當初失敗的地方重新執行,那麼我應如何啟動程式呢?
A:讀取失敗的檢查點,然後依託它重新執行即可
sudo
-u hdfs /myflink/flink-1.7.2/bin/flink run -s
hdfs://master:8020/flink/checkpoint0/61c58bf29cbcabb8c2e3146ff4eb24b9/chk-15
-m yarn-cluster
-c flink.ceshi /opt/flink_path/sbt-solr-assembly.jar(-s後面表示的是我檢查點的路徑,該命令代表的是從我當初檢查點處“繼續”執行)
此時可以看到我該目錄下面有2個檢查點了,
“0b3f0bcca83ed8c7176a8eed06296c1a”
該檢查點是依託
“61c58bf29cbcabb8c2e3146ff4eb24b9”
檢查點的狀態生成的新檢查點
儲存點在什麼情況下觸發?
儲存點側重的是維護,即flink作業需要在人工干預的情況下進行重啟或升級,維護完畢後再從儲存點恢復到升級後的狀態。
不取消當前應用時建立儲存點
sudo -u hdfs /myflink/flink-1.7.2/bin/flink
savepoint 0b3f0bcca83ed8c7176a8eed06296c1a(注意這裡的與前面的檢查點要保持一致,因為我要對A程式的檢查點進行手動建立儲存點,而A程式對應的檢查點則是該檢查點)
-yid application_1588754764898_0023
(針對flink on yarn模式需要指定jobID)
hdfs://master:8020/flink/checkpoint03/s1
取消當前flink應用之前生成儲存點
sudo -u hdfs /myflink/flink-1.7.2/bin/flink cancel
-s hdfs://master:8020/flink/checkpoint03/s2
b5745a9e4fe87c403a05e7fc73cacee7
-yid application_1588995876125_0052
從儲存點處啟動程式
sudo -u hdfs /myflink/flink-1.7.2/bin/flink run
-s
hdfs://master:8020/flink/checkpoint03/s1/savepoint-0b3f0b-ed13f369aadc
-m yarn-cluster
-c flink.ceshi /opt/flink_path/sbt-solr-assembly.jar
Q:在我殺死A程式到我從儲存點啟動A程式的這個過程當中,我的kafka資料沒有斷過,那麼此時當我從儲存點重新啟動程式時,我的資料會丟失嗎?
A:答案是不會,因為當你在生成儲存點時,是通過檢查點進行生成的,而檢查點中是有Kafka的偏移量的,因此你kafka的資料不會丟失