Flink Checkpoint機制原理剖析與引數配置
在Flink狀態管理詳解這篇文章中,我們介紹了Flink的狀態都是基於本地的,而Flink又是一個部署在多節點的分散式引擎,分散式系統經常出現程序被殺、節點宕機或網路中斷等問題,那麼本地的狀態在遇到故障時如何保證不丟呢?Flink定期儲存狀態資料到儲存上,故障發生後從之前的備份中恢復,整個被稱為Checkpoint機制,它為Flink提供了Exactly-Once的投遞保障。本文將介紹Flink的Checkpoint機制的原理。本文會使用多個概念:快照(Snapshot)、分散式快照(Distributed Snapshot)、檢查點(Checkpoint)等,這些概念均指的是Flink的Checkpoint機制,讀者可以將這些概念等同看待。
http://weixin.qq.com/r/0Uzn--PEJK5brbec9xnD(二維碼自動識別)
Flink分散式快照流程
首先我們來看一下一個簡單的Checkpoint的大致流程:
- 暫停處理新流入資料,將新資料快取起來。
- 將運算元子任務的本地狀態資料拷貝到一個遠端的持久化儲存上。
- 繼續處理新流入的資料,包括剛才快取起來的資料。
Flink是在Chandy–Lamport演算法[1]的基礎上實現的一種分散式快照演算法。在介紹Flink的快照詳細流程前,我們先要了解一下檢查點分界線(Checkpoint Barrier)的概念。如下圖所示,Checkpoint Barrier被插入到資料流中,它將資料流切分成段。Flink的Checkpoint邏輯是,一段新資料流入導致狀態發生了變化,Flink的運算元接收到Checpoint Barrier後,對狀態進行快照。每個Checkpoint Barrier有一個ID,表示該段資料屬於哪次Checkpoint。如圖所示,當ID為n的Checkpoint Barrier到達每個運算元後,表示要對n-1和n之間狀態的更新做快照。Checkpoint Barrier有點像Event Time中的Watermark,它被插入到資料流中,但並不影響資料流原有的處理順序。
接下來,我們構建一個並行資料流圖,用這個並行資料流圖來演示Flink的分散式快照機制。這個資料流圖有兩個Source子任務,資料流會在這些並行運算元上從Source流動到Sink。
首先,Flink的檢查點協調器(Checkpoint Coordinator)觸發一次Checkpoint(Trigger Checkpoint),這個請求會發送給Source的各個子任務。
各Source運算元子任務接收到這個Checkpoint請求之後,會將自己的狀態寫入到狀態後端,生成一次快照,並且會向下遊廣播Checkpoint Barrier。
Source運算元做完快照後,還會給Checkpoint Coodinator傳送一個確認,告知自己已經做完了相應的工作。這個確認中包括了一些元資料,其中就包括剛才備份到State Backend的狀態控制代碼,或者說是指向狀態的指標。至此,Source完成了一次Checkpoint。跟Watermark的傳播一樣,一個運算元子任務要把Checkpoint Barrier傳送給所連線的所有下游運算元子任務。
對於下游運算元來說,可能有多個與之相連的上游輸入,我們將運算元之間的邊稱為通道。Source要將一個ID為n的Checkpoint Barrier向所有下游運算元廣播,這也意味著下游運算元的多個輸入裡都有同一個Checkpoint Barrier,而且不同輸入裡Checkpoint Barrier的流入進度可能不同。Checkpoint Barrier傳播的過程需要進行對齊(Barrier Alignment),我們從資料流圖中擷取一小部分來分析Checkpoint Barrier是如何在運算元間傳播和對齊的。
如上圖所示,對齊分為四步:
- 運算元子任務在某個輸入通道中收到第一個ID為n的Checkpoint Barrier,但是其他輸入通道中ID為n的Checkpoint Barrier還未到達,該運算元子任務開始準備進行對齊。
- 運算元子任務將第一個輸入通道的資料快取下來,同時繼續處理其他輸入通道的資料,這個過程被稱為對齊。
- 第二個輸入通道的Checkpoint Barrier抵達該運算元子任務,該運算元子任務執行快照,將二手遊戲賬號拍賣地圖狀態寫入State Backend,然後將ID為n的Checkpoint Barrier向下遊所有輸出通道廣播。
- 對於這個運算元子任務,快照執行結束,繼續處理各個通道中新流入資料,包括剛才快取起來的資料。
資料流圖中的每個運算元子任務都要完成一遍上述的對齊、快照、確認的工作,當最後所有Sink運算元確認完成快照之後,說明ID為n的Checkpoint執行結束,Checkpoint Coordinator向State Backend寫入一些本次Checkpoint的元資料。
之所以要進行對齊,主要是為了保證一個Flink作業所有運算元的狀態是一致的。也就是說,某個ID為n的Checkpoint Barrier從前到後流入所有運算元子任務後,所有運算元子任務都能將同樣的一段資料寫入快照。
快照效能優化方案
前面和大家分享了一致性快照的具體流程,這種方式保證了資料的一致性,但有一些潛在的問題:
- 每次進行Checkpoint前,都需要暫停處理新流入資料,然後開始執行快照,假如狀態比較大,一次快照可能長達幾秒甚至幾分鐘。
- Checkpoint Barrier對齊時,必須等待所有上游通道都處理完,假如某個上游通道處理很慢,這可能造成整個資料流堵塞。
針對這些問題Flink已經有了一些解決方案,並且還在不斷優化。
對於第一個問題,Flink提供了非同步快照(Asynchronous Snapshot)的機制。當實際執行快照時,Flink可以立即向下廣播Checkpoint Barrier,表示自己已經執行完自己部分的快照。同時,Flink啟動一個後臺執行緒,它建立本地狀態的一份拷貝,這個執行緒用來將本地狀態的拷貝同步到State Backend上,一旦資料同步完成,再給Checkpoint Coordinator傳送確認資訊。拷貝一份資料肯定佔用更多記憶體,這時可以利用寫入時複製(Copy-on-Write)的優化策略。Copy-on-Write指:如果這份記憶體資料沒有任何修改,那沒必要生成一份拷貝,只需要有一個指向這份資料的指標,通過指標將本地資料同步到State Backend上;如果這份記憶體資料有一些更新,那再去申請額外的記憶體空間並維護兩份資料,一份是快照時的資料,一份是更新後的資料。
對於第二個問題,Flink允許跳過對齊這一步,或者說一個運算元子任務不需要等待所有上游通道的Checkpoint Barrier,直接將Checkpoint Barrier廣播,執行快照並繼續處理後續流入資料。為了保證資料一致性,Flink必須將那些較慢的資料流中的元素也一起快照,一旦重啟,這些元素會被重新處理一遍。
State Backend
前面已經分享了Flink的快照機制,其中State Backend起到了持久化儲存資料的重要功能。Flink將State Backend抽象成了一種外掛,並提供了三種State Backend,每種State Backend對資料的儲存和恢復方式略有不同。接下來我們開始詳細瞭解一下Flink的State Backend。
MemoryStateBackend
從名字中可以看出,這種State Backend主要基於記憶體,它將資料儲存在Java的堆區。當進行分散式快照時,所有運算元子任務將自己記憶體上的狀態同步到JobManager的堆上,一個作業的所有狀態要小於JobManager的記憶體大小。這種方式顯然不能儲存過大的狀態資料,否則將丟擲OutOfMemoryError
異常。因此,這種方式只適合除錯或者實驗,不建議在生產環境下使用。下面的程式碼告知一個Flink作業使用記憶體作為State Backend,並在引數中指定了狀態的最大值,預設情況下,這個最大值是5MB。
env.setStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE))
如果不做任何配置,預設情況是使用記憶體作為State Backend。
FsStateBackend
這種方式下,資料持久化到檔案系統上,檔案系統包括本地磁碟、HDFS以及包括Amazon、阿里雲在內的雲端儲存服務。使用時,我們要提供檔案系統的地址,尤其要寫明字首,比如:file://
、hdfs://
或s3://
。此外,這種方式支援Asynchronous Snapshot,預設情況下這個功能是開啟的,可加快資料同步速度。
// 使用HDFS作為State Backend
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"))
// 使用阿里雲OSS作為State Backend
env.setStateBackend(new FsStateBackend("oss://<your-bucket>/<object-name>"))
// 使用Amazon作為State Backend
env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"))
// 關閉Asynchronous Snapshot
env.setStateBackend(new FsStateBackend(checkpointPath, false))
Flink的本地狀態仍然在TaskManager的記憶體堆區上,直到執行快照時狀態資料會寫到所配置的檔案系統上。因此,這種方式能夠享受本地記憶體的快速讀寫訪問,也能保證大容量狀態作業的故障恢復能力。
RocksDBStateBackend
這種方式下,本地狀態儲存在本地的RocksDB上。RocksDB是一種嵌入式Key-Value資料庫,資料實際儲存在本地磁碟上。比起FsStateBackend
的本地狀態儲存在記憶體中,RocksDB利用了磁碟空間,所以可儲存的本地狀態更大。然而,每次從RocksDB中讀寫資料都需要進行序列化和反序列化,因此讀寫本地狀態的成本更高。快照執行時,Flink將儲存於本地RocksDB的狀態同步到遠端的儲存上,因此使用這種State Backend時,也要配置分散式儲存的地址。Asynchronous Snapshot在預設情況也是開啟的。
此外,這種State Backend允許增量快照(Incremental Checkpoint),Incremental Checkpoint的核心思想是每次快照時只對發生變化的資料增量寫到分散式儲存上,而不是將所有的本地狀態都拷貝過去。Incremental Checkpoint非常適合超大規模的狀態,快照的耗時將明顯降低,同時,它的代價是重啟恢復的時間更長。預設情況下,Incremental Checkpoint沒有開啟,需要我們手動開啟。
// 開啟Incremental Checkpoint
val enableIncrementalCheckpointing = true
env.setStateBackend(new RocksDBStateBackend(checkpointPath, enableIncrementalCheckpointing))
相比FsStateBackend
,RocksDBStateBackend
能夠支援的本地和遠端狀態都更大,Flink社群已經有TB級的案例。
除了上述三種之外,開發者也可以自行開發State Backend的具體實現。
重啟恢復流程
Flink的重啟恢復邏輯相對比較簡單:
- 重啟應用,在叢集上重新部署資料流圖。
- 從持久化儲存上讀取最近一次的Checkpoint資料,載入到各運算元子任務上。
- 繼續處理新流入的資料。
這樣的機制可以保證Flink內部狀態的Excatly-Once一致性。至於端到端的Exactly-Once一致性,要根據Source和Sink的具體實現而定。當發生故障時,一部分資料有可能已經流入系統,但還未進行Checkpoint,Source的Checkpoint記錄了輸入的Offset;當重啟時,Flink能把最近一次的Checkpoint恢復到記憶體中,並根據Offset,讓Source從該位置重新發送一遍資料,以保證資料不丟不重。像Kafka等訊息佇列是提供重發功能的,socketTextStream
就不具有這種功能,也意味著不能保證Exactly-Once投遞保障。
Checkpoint相關配置
預設情況下,Checkpoint機制是關閉的,需要呼叫env.enableCheckpointing(n)
來開啟,每隔n毫秒進行一次Checkpoint。Checkpoint是一種負載較重的任務,如果狀態比較大,同時n值又比較小,那可能一次Checkpoint還沒完成,下次Checkpoint已經被觸發,佔用太多本該用於正常資料處理的資源。增大n值意味著一個作業的Checkpoint次數更少,整個作業用於進行Checkpoint的資源更小,可以將更多的資源用於正常的流資料處理。同時,更大的n值意味著重啟後,整個作業需要從更長的Offset開始重新處理資料。
此外,還有一些其他引數需要配置,這些引數統一封裝在了CheckpointConfig
裡:
val cpConfig: CheckpointConfig = env.getCheckpointConfig
預設的Checkpoint配置是支援Exactly-Once投遞的,這樣能保證在重啟恢復時,所有運算元的狀態對任一條資料只處理一次。用上文的Checkpoint原理來說,使用Exactly-Once就是進行了Checkpoint Barrier對齊,因此會有一定的延遲。如果作業延遲小,那麼應該使用At-Least-Once投遞,不進行對齊,但某些資料會被處理多次。
// 使用At-Least-Once
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
如果一次Checkpoint超過一定時間仍未完成,直接將其終止,以免其佔用太多資源:
// 超時時間1小時
env.getCheckpointConfig.setCheckpointTimeout(3600*1000)
如果兩次Checkpoint之間的間歇時間太短,那麼正常的作業可能獲取的資源較少,更多的資源被用在了Checkpoint上。對這個引數進行合理配置能保證資料流的正常處理。比如,設定這個引數為60秒,那麼前一次Checkpoint結束後60秒內不會啟動新的Checkpoint。這種模式只在整個作業最多允許1個Checkpoint時適用。
// 兩次Checkpoint的間隔為60秒
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60*1000)
預設情況下一個作業只允許1個Checkpoint執行,如果某個Checkpoint正在進行,另外一個Checkpoint被啟動,新的Checkpoint需要掛起等待。
// 最多同時進行3個Checkpoint
env.getCheckpointConfig.setMaxConcurrentCheckpoints(3)
如果這個引數大於1,將與前面提到的最短間隔相沖突。
Checkpoint的初衷是用來進行故障恢復,如果作業是因為異常而失敗,Flink會儲存遠端儲存上的資料;如果開發者自己取消了作業,遠端儲存上的資料都會被刪除。如果開發者希望通過Checkpoint資料進行除錯,自己取消了作業,同時希望將遠端資料儲存下來,需要設定為:
// 作業取消後仍然儲存Checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
RETAIN_ON_CANCELLATION
模式下,使用者需要自己手動刪除遠端儲存上的Checkpoint資料。
預設情況下,如果Checkpoint過程失敗,會導致整個應用重啟,我們可以關閉這個功能,這樣Checkpoint失敗不影響作業的執行。
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
1. Leslie Lamport, K. Mani Chandy: Distributed Snapshots: Determining Global States of a Distributed System. In:ACM Transactions on Computer Systems 3. Nr. 1, Februar 1985.