1. 程式人生 > >Flink的Checkpoint和Savepoint介紹

Flink的Checkpoint和Savepoint介紹

第一部分:Flink的Checkpoint

1. Flink Checkpoint原理介紹

Checkpoint是Flink實現容錯機制最核心的功能,它能夠根據配置週期性地基於Stream中各個Operator的狀態來生成Snapshot,從而將這些狀態資料定期持久化儲存下來,當Flink程式一旦意外崩潰時,重新執行程式時可以有選擇地從這些Snapshot進行恢復,從而修正因為故障帶來的程式資料狀態中斷。這裡,我們簡單理解一下Flink Checkpoint機制,如官網下圖所示:
在這裡插入圖片描述

Checkpoint指定觸發生成時間間隔後,每當需要觸發Checkpoint時,會向Flink程式執行時的多個分散式的Stream Source中插入一個Barrier標記,這些Barrier會根據Stream中的資料記錄一起流向下游的各個Operator。當一個Operator接收到一個Barrier時,它會暫停處理Steam中新接收到的資料記錄。因為一個Operator可能存在多個輸入的Stream,而每個Stream中都會存在對應的Barrier,該Operator要等到所有的輸入Stream中的Barrier都到達。當所有Stream中的Barrier都已經到達該Operator,這時所有的Barrier在時間上看來是同一個時刻點(表示已經對齊),在等待所有Barrier到達的過程中,Operator的Buffer中可能已經快取了一些比Barrier早到達Operator的資料記錄(Outgoing Records),這時該Operator會將資料記錄(Outgoing Records)發射(Emit)出去,作為下游Operator的輸入,最後將Barrier對應Snapshot發射(Emit)出去作為此次Checkpoint的結果資料。

2. Checkpoint的簡單設定

開啟Checkpoint功能,有兩種方式。其一是在conf/flink_conf.yaml中做系統設定;其二是針對任務再程式碼裡靈活配置。但是我個人推薦第二種方式,針對當前任務設定,設定程式碼如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設定儲存點的儲存路徑,這裡是儲存在hdfs中
env.setStateBackend(new FsStateBackend("hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints"
)); CheckpointConfig config = env.getCheckpointConfig(); // 任務流取消和故障應保留檢查點 config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 儲存點模式:exactly_once config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 觸發儲存點的時間間隔 config.setCheckpointInterval(60000);

上面呼叫enableExternalizedCheckpoints設定為ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink處理程式被cancel後,會保留Checkpoint資料,以便根據實際需要恢復到指定的Checkpoint處理。上面程式碼配置了執行Checkpointing的時間間隔為1分鐘。

3. 儲存多個Checkpoint

預設情況下,如果設定了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程式失敗時,可以從最近的這個Checkpoint來進行恢復。但是,如果我們希望保留多個Checkpoint,並能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,我們發現最近4個小時資料記錄處理有問題,希望將整個狀態還原到4小時之前。
Flink可以支援保留多個Checkpoint,需要在Flink的配置檔案conf/flink-conf.yaml中,新增如下配置,指定最多需要儲存Checkpoint的個數:

state.checkpoints.num-retained: 20

這樣設定以後,執行Flink Job,並檢視對應的Checkpoint在HDFS上儲存的檔案目錄,示例如下所示:

hdfs dfs -ls /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/
Found 22 items
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:23 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-858
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:24 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-859
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:25 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:26 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-861
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:27 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-862
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:28 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-863
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:29 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-864
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:30 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-865
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:31 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-866
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:32 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-867
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:33 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-868
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:34 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-869
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:35 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-870
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:36 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-871
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:37 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-872
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:38 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-873
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:39 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-874
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:40 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-875
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:41 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-876
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:42 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-877
drwxr-xr-x   - hadoop supergroup          0 2018-08-31 20:05 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/shared
drwxr-xr-x   - hadoop supergroup          0 2018-08-31 20:05 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/taskowned

可見,我們配置了state.checkpoints.num-retained的值為20,只保留了最近的20個Checkpoint。如果希望會退到某個Checkpoint點,只需要指定對應的某個Checkpoint路徑即可實現。

4.從Checkpoint進行恢復

如果Flink程式異常失敗,或者最近一段時間內資料處理錯誤,我們可以將程式從某一個Checkpoint點,比如chk-860進行回放,執行如下命令:

bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar

程式正常執行後,還會按照Checkpoint配置進行執行,繼續生成Checkpoint資料,如下所示:

hdfs dfs -ls /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e
Found 6 items
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:56 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-861
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:57 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-862
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:58 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-863
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:59 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-864
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:55 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/shared
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:55 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/taskowned

從上面我們可以看到,前面Flink Job的ID為582e17d2cc343e6c56255d111bae0191,所有的Checkpoint檔案都在以Job ID為名稱的目錄裡面,當Job停掉後,重新從某個Checkpoint點(chk-860)進行恢復時,重新生成Job ID(這裡是11bbc5d9933e4ff7e25198a760e9792e),而對應的Checkpoint編號會從該次執行基於的編號繼續連續生成:chk-861、chk-862、chk-863等等。

第二部分: Flink的Savepoint

1.Flink的Savepoint原理介紹

Savepoint會在Flink Job之外儲存自包含(self-contained)結構的Checkpoint,它使用Flink的Checkpoint機制來建立一個非增量的Snapshot,裡面包含Streaming程式的狀態,並將Checkpoint的資料儲存到外部儲存系統中。

Flink程式中包含兩種狀態資料,一種是使用者定義的狀態(User-defined State),他們是基於Flink的Transformation函式來建立或者修改得到的狀態資料;另一種是系統狀態(System State),他們是指作為Operator計算一部分的資料Buffer等狀態資料,比如在使用Window Function時,在Window內部快取Streaming資料記錄。為了能夠在建立Savepoint過程中,唯一識別對應的Operator的狀態資料,Flink提供了API來為程式中每個Operator設定ID,這樣可以在後續更新/升級程式的時候,可以在Savepoint資料中基於Operator ID來與對應的狀態資訊進行匹配,從而實現恢復。當然,如果我們不指定Operator ID,Flink也會我們自動生成對應的Operator狀態ID。
而且,強烈建議手動為每個Operator設定ID,即使未來Flink應用程式可能會改動很大,比如替換原來的Operator實現、增加新的Operator、刪除Operator等等,至少我們有可能與Savepoint中儲存的Operator狀態對應上。另外,儲存的Savepoint狀態資料,畢竟是基於當時程式及其記憶體資料結構生成的,所以如果未來Flink程式改動比較大,尤其是對應的需要操作的記憶體資料結構都變化了,可能根本就無法從原來舊的Savepoint正確地恢復。

下面,我們以Flink官網文件中給定的例子,來看下如何設定Operator ID,程式碼如下所示:

DataStream<String> stream = env.
  // 有狀態的source ID (例如:Kafka)
  .addSource(new StatefulSource())
  .uid("source-id") // source操作的ID
  .shuffle()
  // 有狀態的mapper ID
  .map(new StatefulMapper())
  .uid("mapper-id") // mapper的ID 
  // 無狀態sink列印
  .print(); // 自動生成ID

2.建立Savepoint

建立一個Savepoint,需要指定對應Savepoint目錄,有兩種方式來指定:
一種是,需要配置Savepoint的預設路徑,需要在Flink的配置檔案conf/flink-conf.yaml中,新增如下配置,設定Savepoint儲存目錄,例如如下所示:

state.savepoints.dir: hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints

另一種是,在手動執行savepoint命令的時候,指定Savepoint儲存目錄,命令格式如下所示:

bin/flink savepoint :jobId [:targetDirectory]

例如,正在執行的Flink Job對應的ID為40dcc6d2ba90f13930abce295de8d038,使用預設state.savepoints.dir配置指定的Savepoint目錄,執行如下命令:

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038

可以看到,在目錄hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints/savepoint-40dcc6-4790807da3b0下面生成了ID為40dcc6d2ba90f13930abce295de8d038的Job的Savepoint資料。
為正在執行的Flink Job指定一個目錄儲存Savepoint資料,執行如下命令:

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints

可以看到,在目錄 hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f下面生成了ID為40dcc6d2ba90f13930abce295de8d038的Job的Savepoint資料。

3.從Savepoint恢復

現在,我們可以停掉Job 40dcc6d2ba90f13930abce295de8d038,然後通過Savepoint命令來恢復Job執行,命令格式如下所示:

bin/flink run -s :savepointPath [:runArgs]

以上面儲存的Savepoint為例,恢復Job執行,執行如下命令:

bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar

可以看到,啟動一個新的Flink Job,ID為cdbae3af1b7441839e7c03bab0d0eefd。

4. Savepoint目錄結構

下面,我們看一下Savepoint目錄下面儲存內容的結構,如下所示:

hdfs dfs -ls /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b
Found 5 items
-rw-r--r--   3 hadoop supergroup       4935 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/50231e5f-1d05-435f-b288-06d5946407d6
-rw-r--r--   3 hadoop supergroup       4599 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/7a025ad8-207c-47b6-9cab-c13938939159
-rw-r--r--   3 hadoop supergroup       4976 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/_metadata
-rw-r--r--   3 hadoop supergroup       4348 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/bd9b0849-aad2-4dd4-a5e0-89297718a13c
-rw-r--r--   3 hadoop supergroup       4724 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/be8c1370-d10c-476f-bfe1-dd0c0e7d498a

如上面列出的HDFS路徑中,11bbc5是Flink Job ID字串前6個字元,後面bd967f90709b是隨機生成的字串,然後savepoint-11bbc5-bd967f90709b作為儲存此次Savepoint資料的根目錄,最後savepoint-11bbc5-bd967f90709b目錄下面_metadata檔案包含了Savepoint的元資料資訊,其中序列化包含了savepoint-11bbc5-bd967f90709b目錄下面其它檔案的路徑,這些檔案內容都是序列化的狀態資訊。

參考
http://shiyanjun.cn/archives/1855.html