1. 程式人生 > 其它 >Flink實時計算-Checkpoint和Savepoin

Flink實時計算-Checkpoint和Savepoin

來源:https://zhuanlan.zhihu.com/p/79526638

前言

為了保證程式的容錯恢復以及程式啟動時其狀態恢復,幾乎所有公司的實時任務都會開啟 Checkpoint 或者觸發 Savepoint 進行狀態儲存。為了使得使用者更加理解這兩點區別,本文結合 Flink 1.9 版本,重點講述 Flink Checkpoint,Savepoint 相關概念以及注意事項,同時也會講述實時任務啟動時讀取 Kafka偏移量問題,使得使用者能夠更好的開發實時任務。

1. Checkpoint,Savepoint 異同

首先,為什麼會在文章開頭對這兩點進行介紹,因為有時候使用者在開發實時任務時,會對這兩點產生困惑,所以這裡直接開門見山對這兩點進行講解。

Flink Checkpoint 是一種容錯恢復機制。這種機制保證了實時程式執行時,即使突然遇到異常也能夠進行自我恢復。Checkpoint 對於使用者層面,是透明的,使用者會感覺程式一直在執行。Flink Checkpoint 是 Flink 自身的系統行為,使用者無法對其進行互動,使用者可以在程式啟動之前,設定好實時程式 Checkpoint 相關引數,當程式啟動之後,剩下的就全交給 Flink 自行管理。當然在某些情況,比如 Flink On Yarn 模式,某個 Container 發生 OOM 異常,這種情況程式直接變成失敗狀態,此時 Flink 程式雖然開啟 Checkpoint 也無法恢復,因為程式已經變成失敗狀態,所以此時可以藉助外部參與啟動程式,比如外部程式檢測到實時任務失敗時,從新對實時任務進行拉起。

Flink Savepoint 你可以把它當做在某個時間點程式狀態全域性映象,以後程式在進行升級,或者修改併發度等情況,還能從儲存的狀態位繼續啟動恢復。Flink Savepoint 一般儲存在 HDFS 上面,它需要使用者主動進行觸發。如果是使用者自定義開發的實時程式,比如使用DataStream進行開發,建議為每個運算元定義一個 uid,這樣我們在修改作業時,即使導致程式拓撲圖改變,由於相關運算元 uid 沒有變,那麼這些運算元還能夠繼續使用之前的狀態,如果使用者沒有定義 uid , Flink 會為每個運算元自動生成 uid,如果使用者修改了程式,可能導致之前的狀態程式不能再進行復用。

Flink Checkpoint和Savepoint對比:

  1. 概念:Checkpoint 是 自動容錯機制 ,Savepoint 程式全域性狀態映象 。
  2. 目的: Checkpoint 是程式自動容錯,快速恢復 。Savepoint是 程式修改後繼續從狀態恢復,程序升級等。
  3. 使用者互動:Checkpoint 是 Flink 系統行為 。Savepoint是使用者觸發。
  4. 狀態檔案保留策略:Checkpoint預設程式刪除,可以設定CheckpointConfig中的引數進行保留 。Savepoint會一直儲存,除非使用者刪除 。

2. Flink Checkpoint

2.1 Flink Checkpoint 原理

Flink Checkpoint 機制保證 Flink 任務執行突然失敗時,能夠從最近 Checkpoint 進行狀態恢復啟動,進行錯誤容忍。它是一種自動容錯機制,而不是具體的狀態儲存映象。Flink Checkpoint 受 Chandy-Lamport分散式快照啟發,其內部使用分散式資料流輕量級非同步快照

Checkpoint 儲存的狀態在程式取消時,預設會進行清除。Checkpoint狀態保留策略有兩種:

  1. DELETE_ON_CANCELLATION 表示當程式取消時,刪除 Checkpoint 儲存檔案。
  2. RETAIN_ON_CANCELLATION 表示當程式取消時,儲存之前的 Checkpoint 儲存檔案。使用者可以結合業務情況,設定 Checkpoint 保留模式。

預設情況下,Flink不會觸發一次 Checkpoint 當系統有其他 Checkpoint 在進行時,也就是說 Checkpoint 預設的併發為1。針對 Flink DataStream 任務,程式需要經歷從StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖四個步驟,其中在 ExecutionGraph 構建時,會初始化 CheckpointCoordinator。ExecutionGraph通過ExecutionGraphBuilder.buildGraph方法構建,在構建完時,會呼叫 ExecutionGraph 的enableCheckpointing方法建立CheckpointCoordinator。CheckpoinCoordinator 是 Flink 任務 Checkpoint 的關鍵,針對每一個 Flink 任務,都會初始化一個 CheckpointCoordinator 類,來觸發 Flink 任務 Checkpoint。下面是 Flink 任務 Checkpoint 大致流程:

Flink 會定時在任務的 Source Task 觸發 Barrier,Barrier是一種特殊的訊息事件,會隨著訊息通道流入到下游的運算元中。只有當最後 Sink 端的運算元接收到 Barrier 並確認該次 Checkpoint 完成時,該次 Checkpoint 才算完成。所以在某些運算元的 Task 有多個輸入時,會存在 Barrier 對齊時間,我們可以在Web UI上面看到各個 Task 的Barrier 對齊時間

2.2 Flink Checkpoint 語義

Flink Checkpoint 支援兩種語義:Exactly Once和At least Once,預設的 Checkpoint 模式是 Exactly Once. Exactly Once 和 At least Once 具體是針對 Flink狀態而言。具體語義含義如下:

Exactly Once含義是:保證每條資料對於 Flink 的狀態結果隻影響一次。打個比方,比如 WordCount程式,目前實時統計的 "hello" 這個單詞數為5,同時這個結果在這次 Checkpoint 成功後,儲存在了 HDFS。在下次 Checkpoint 之前, 又來2個 "hello" 單詞,突然程式遇到外部異常容錯自動回覆,從最近的 Checkpoint 點開始恢復,那麼會從單詞數 5 這個狀態開始恢復,Kafka 消費的資料點位還是狀態 5 這個時候的點位開始計算,所以即使程式遇到外部異常自我恢復,也不會影響到 Flink 狀態的結果。

At Least Once含義是:每條資料對於 Flink 狀態計算至少影響一次。比如在 WordCount 程式中,你統計到的某個單詞的單詞數可能會比真實的單詞數要大,因為同一條訊息,你可能將其計算多次。

Flink 中 Exactly Once 和 At Least Once 具體是針對 Flink 任務狀態而言的,並不是 Flink 程式對其處理一次。舉個例子,當前 Flink 任務正在做 Checkpoint,該次Checkpoint還麼有完成,該次 Checkpoint 時間端的資料其實已經進入 Flink 程式處理,只是程式狀態沒有最終儲存到遠端儲存。當程式突然遇到異常,進行容錯恢復,那麼就會從最新的 Checkpoint 進行狀態恢復重啟,上一部分還會進入 Flink 系統處理:

上圖中表示,在進行 chk-5 Checkpoint 時,突然遇到程式異常,那麼會從 chk-4 進行恢復,那麼之前chk-5 處理的資料,會再次進行處理。

Exactly Once 和 At Least Once 具體在底層實現大致相同,具體差異表現在 Barrier 對齊方式處理:

如果是 Exactly Once 模式,某個運算元的 Task 有多個輸入通道時,當其中一個輸入通道收到 Barrier 時,Flink Task 會阻塞處理該通道,其不會處理這些資料,但是會將這些資料儲存到內部快取中,一旦完成了所有輸入通道的 Barrier 對齊,才會繼續對這些資料進行消費處理。

對於 At least Once,同樣針對某個運算元的 Task 有多個輸入通道的情況下,當某個輸入通道接收到 Barrier 時,它不同於Exactly Once,At Least Once 會繼續處理接受到的資料,即使沒有完成所有輸入通道Barrier 對齊。所以使用At Least Once 不能保證資料對於狀態計算只有一次影響。

2.3 Flink Checkpoint 引數配置及建議

1. 當 Checkpoint 時間比設定的 Checkpoint 間隔時間要長時,可以設定 Checkpoint 間最小時間間隔。這樣在上次 Checkpoint 完成時,不會立馬進行下一次 Checkpoint,而是會等待一個最小時間間隔,然後在進行該次 Checkpoint。否則,每次 Checkpoint 完成時,就會立馬開始下一次 Checkpoint,系統會有很多資源消耗 Checkpoint。

2. 如果Flink狀態很大,在進行恢復時,需要從遠端儲存讀取狀態恢復,此時可能導致任務恢復很慢,可以設定 Flink Task 本地狀態恢復。任務狀態本地恢復預設沒有開啟,可以設定引數`state.backend.local-recovery`值為`true`進行啟用。

3. Checkpoint儲存數,Checkpoint 儲存數預設是1,也就是儲存最新的 Checkpoint 檔案,當進行狀態恢復時,如果最新的Checkpoint檔案不可用時(比如HDFS檔案所有副本都損壞或者其他原因),那麼狀態恢復就會失敗,如果設定 Checkpoint 儲存數2,即使最新的Checkpoint恢復失敗,那麼Flink 會回滾到之前那一次Checkpoint進行恢復。考慮到這種情況,使用者可以增加 Checkpoint 儲存數。

4. 建議設定的 Checkpoint 的間隔時間最好大於 Checkpoint 的完成時間。

下圖是不設定 Checkpoint 最小時間間隔示例圖,可以看到,系統一致在進行 Checkpoint,可能對執行的任務產生一定影響:

3. Flink Savepoint

3.1 Flink Savepoint 原理

Flink Savepoint 作為實時任務的全域性映象,其在底層使用的程式碼和Checkpoint的程式碼是一樣的,因為Savepoint可以看做 Checkpoint在特定時期的一個狀態快照。

Flink 在觸發Savepoint 或者 Checkpoint時,會根據這次觸發的型別計算出在HDFS上面的目錄:

如果型別是 Savepoint,那麼 其 HDFS 上面的目錄為:Savepoint 根目錄+savepoint-jobid前六位+隨機數字,具體如下格式:

Checkpoint 目錄為 chk-checkpoint ID,具體格式如下:

一次 Savepoint 目錄下面至少包括一個檔案,既_metadata檔案。當然如果實時任務某些運算元有狀態的話,那麼在 這次 Savepoint 目錄下面會包含一個_metadata檔案以及多個狀態資料檔案。_metadata檔案以絕對路徑的形式指向狀態檔案的指標。

社群方面,在以前的 Flink 版本,當用戶選擇不同的狀態儲存,其底層狀態儲存的二進位制格式都不相同。針對這種情況,目前 [FLIP-41](FLIP-41: Unify Binary format for Keyed State) 對於 Keyed State 使用統一的二進位制檔案進行儲存。這裡的 Keyed State 主要是針對 Savepoint 的狀態,Checkpoint 狀態的儲存可以根據具體的狀態後端進行儲存,允許狀態儲存底層格式的差異。對於 Savepoint 狀態底層格式的統一,應用的狀態可以在不同的狀態後端進行遷移,更方便應用程式的恢復。重做與狀態快照和恢復相關的抽象,當實現實現新狀態後端時,可以降低開銷,同時減少程式碼重複。

3.2 Flink Savepoint 觸發方式

Flink Savepoint 觸發方式目前有三種:

1. 使用flink savepoint命令觸發 Savepoint,其是在程式執行期間觸發 savepoint。

2. 使用flink cancel -s命令,取消作業時,並觸發 Savepoint。

3. 使用 Rest API 觸發 Savepoint,格式為:**/jobs/:jobid /savepoints**

3.3 Flink Savepoint 注意點

1. 使用flink cancel -s命令取消作業同時觸發 Savepoint 時,會有一個問題,可能存在觸發 Savepoint 失敗。比如實時程式處於異常狀態(比如 Checkpoint失敗),而此時你停止作業,同時觸發 Savepoint,這次 Savepoint 就會失敗,這種情況會導致,在實時平臺上面看到任務已經停止,但是實際實時作業在 Yarn 還在執行。針對這種情況,需要捕獲觸發 Savepoint 失敗的異常,當丟擲異常時,可以直接在 Yarn 上面 Kill 掉該任務。

2. 使用 DataStream 程式開發時,最好為每個運算元分配 `uid`,這樣即使作業拓撲圖變了,相關運算元還是能夠從之前的狀態進行恢復,預設情況下,Flink 會為每個運算元分配 `uid`,這種情況下,當你改變了程式的某些邏輯時,可能導致運算元的 `uid` 發生改變,那麼之前的狀態資料,就不能進行復用,程式在啟動的時候,就會報錯。

3. 由於 Savepoint 是程式的全域性狀態,對於某些狀態很大的實時任務,當我們觸發 Savepoint,可能會對執行著的實時任務產生影響,個人建議如果對於狀態過大的實時任務,觸發 Savepoint 的時間,不要太過頻繁。根據狀態的大小,適當的設定觸發時間

4. 當我們從 Savepoint 進行恢復時,需要檢查這次 Savepoint 目錄檔案是否可用。可能存在你上次觸發 Savepoint 沒有成功,導致 HDFS 目錄上面 Savepoint 檔案不可用或者缺少資料檔案等,這種情況下,如果在指定損壞的 Savepoint 的狀態目錄進行狀態恢復,任務會啟動不起來。