1. 程式人生 > >Flink CheckPoint狀態點恢復與savePoint機制對比剖析-Flink牛刀小試

Flink CheckPoint狀態點恢復與savePoint機制對比剖析-Flink牛刀小試

版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何問題,可隨時聯絡。

1 Flink 應用程式啟動

./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 
-c streaming.SoetWindowWordCountJavaCheckPoint(入口類)
/usr/local/install/testJar/FlinkExample-1.0-SNAPSHOT-jar-with-dependencies.jar (jar路徑)
--port 9010

2 Checkpoint 儲存與恢復

2.1 Checkpoin設定與儲存

  • 預設情況下,如果設定了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程式失敗時,可以從最近的這個Checkpoint來進行恢復。但是,如果我們希望保留多個Checkpoint,並能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,我們發現最近4個小時資料記錄處理有問題,希望將整個狀態還原到4小時之前

  • Flink可以支援保留多個Checkpoint,需要在Flink的配置檔案conf/flink-conf.yaml中,新增如下配置,指定最多需要儲存Checkpoint的個數。

      state.checkpoints.num-retained: 20
    
  • 這樣設定以後就檢視對應的Checkpoint在HDFS上儲存的檔案目錄
    hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints
    如果希望回退到某個Checkpoint點,只需要指定對應的某個Checkpoint路徑即可實現

2.2 Checkpoint恢復

  • 如果Flink程式異常失敗,或者最近一段時間內資料處理錯誤,我們可以將程式從某一個Checkpoint點進行恢復

  • -s 後面接的就是待恢復checkpoint的路徑。

    bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar

程式正常執行後,還會按照Checkpoint配置進行執行,繼續生成Checkpoint資料

3 SavePoint 剖析

3.1 全域性一致性快照

  • Flink通過Savepoint功能可以做到程序升級後,繼續從升級前的那個點開始執行計算,保證資料不中斷
  • 全域性,一致性快照。可以儲存資料來源offset,operator操作狀態等資訊
  • 可以從應用在過去任意做了savepoint的時刻開始繼續消費

3.2 checkpoint理論

  • 應用定時觸發,用於儲存狀態,會過期
  • 內部應用失敗重啟的時候使用

3.3 savePoint 理論

  • 使用者手動執行,是指向Checkpoint的指標,不會過期,在升級的情況下使用
  • 注意:為了能夠在作業的不同版本之間以及 Flink 的不同版本之間順利升級,強烈推薦通過 uid(String) 方法手動的給運算元賦予 ID,這些 ID 將用於確定每一個運算元的狀態範圍。如果不手動給各運算元指定 ID,則會由 Flink 自動給每個運算元生成一個 ID。
  • 只要這些 ID 沒有改變就能從儲存點(savepoint)將程式恢復回來。而這些自動生成的 ID 依賴於程式的結構,並且對程式碼的更改是很敏感的。因此,強烈建議使用者手動的設定 ID。

3.4 savePoint的使用

  • 1:在flink-conf.yaml中配置Savepoint儲存位置

    不是必須設定,但是設定後,後面建立指定Job的Savepoint時,可以不用在手動執行命令時指定Savepoint的位置:

      state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
    
  • 2:觸發一個savepoint【直接觸發或者在cancel的時候觸發】

      bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【針對on yarn模式需要指定-yid引數】
      
      bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【針對on yarn模式需要指定-yid引數】
    
  • 3:從指定的savepoint啟動job

      bin/flink run -s savepointPath [runArgs]
    

4 結語

牛刀小試,可能寫的並不是太專業,Flink是一個新型的大資料處理引擎,資料尚不足,辛苦成文,各自珍惜,謝謝!

版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何問題,可隨時聯絡。

秦凱新 於深圳 201811252101