乾貨:Flink+Kafka 0.11端到端精確一次處理語義實現
實時處理裡訊息的僅一次處理是大家關注的重點吧,前面浪尖分享過一篇對比spark streaming 和 flink的文章 <Spark Streaming VS Flink>,裡面講到了如何用spark streaming實現僅一次處理及flink是實現僅一次處理的。本文主要是想詳細闡述一下flink結合kafka 0.11的僅一次處理語義。
本文是浪尖翻譯整理,英文原文閱讀,請點選閱讀原文。
2017年12月Apache Flink社群釋出了1.4版本。該版本正式引入了一個里程碑式的功能:兩階段提交Sink,即TwoPhaseCommitSinkFunction。該SinkFunction提取並封裝了兩階段提交協議中的公共邏輯,自此Flink搭配特定source和sink(特別是0.11版本Kafka)搭建精確一次處理語義( exactly-once semantics
使用者可以閱讀Java文件來學習如何使用TwoPhaseCommitSinkFunction,或者參考Flink官網文件來了解FlinkKafkaProducer011是如何支援 exactly-once semantics的,因為後者正是基於TwoPhaseCommitSinkFunction實現的。
本文將深入討論一下Flink 1.4這個新特性以及其背後的設計思想。在本文中我們將:
1. 描述Flink應用中的checkpoint如何幫助確保exactly-once semantics
2. 展示Flink如何通過兩階段提交協議與source和sink互動以實現端到端的 exactly-once semantics交付保障
3. 給出一個使用TwoPhaseCommitSinkFunction實現 exactly-once semantics的檔案Sink例項
1Flink應用的僅一次處理當談及僅一次處理時,我們真正想表達的是每條輸入訊息只會影響最終結果一次!【譯者:影響應用狀態一次,而非被處理一次】即使出現機器故障或軟體崩潰,Flink也要保證不會有資料被重複處理或壓根就沒有被處理從而影響狀態。長久以來Flink一直宣稱支援 exactly-once semantics是指在一個Flink應用內部。在過去的幾年間,Flink開發出了checkpointing機制,而它則是提供這種應用內僅一次處理的基石。
在繼續之前我們簡要總結一下checkpointing演算法,這對於我們瞭解本文內容至關重要。簡單來說,一個Flink checkpoint是一個一致性快照,它包含:
1. 應用的當前狀態
2. 消費的輸入流位置
Flink會定期地產生checkpoint並且把這些checkpoint寫入到一個持久化儲存上,比如S3或HDFS。這個寫入過程是非同步的,這就意味著Flink即使在checkpointing過程中也是不斷處理輸入資料的。
如果出現機器或軟體故障,Flink應用重啟後會從最新成功完成的checkpoint中恢復——重置應用狀態並回滾狀態到checkpoint中輸入流的正確位置,之後再開始執行資料處理,就好像該故障或崩潰從未發生過一般。
在Flink 1.4版本之前,僅一次處理只限於Flink應用內。Flink處理完資料後需要將結果傳送到外部系統,這個過程中Flink並不保證僅一次處理。但是Flink應用通常都需要接入很多下游子系統,而開發人員很希望能在多個系統上維持僅一次處理語義,即維持端到端的僅一次處理語義。
為了提供端到端的僅一次處理語義,僅一次處理語義必須也要應用於Flink寫入資料的外部系統——故這些外部系統必須提供一種手段允許提交或回滾這些寫入操作,同時還要保證與Flink checkpoint能夠協調使用。
在分散式系統中協調提交和回滾的一個常見方法就是使用兩階段提交協議。下一章節中我們將討論下Flink的TwoPhaseCommitSinkFunction是如何利用兩階段提交協議來實現exactly-once semantics的。
2Flink實現僅一次語義的應用下面將給出一個例項來幫助瞭解兩階段提交協議以及Flink如何使用它來實現僅一次處理語義。該例項從Kafka中讀取資料,經處理之後再寫回到Kafka。Kafka是非常受歡迎的訊息佇列,而Kafka 0.11.0.0版本正式釋出了對於事務的支援——這是與Kafka互動的Flink應用要實現端到端僅一次語義的必要條件。
當然,Flink支援這種僅一次處理語義並不只是限於與Kafka的結合,可以使用任何source/sink,只要它們提供了必要的協調機制。舉個例子,Pravega是Dell/EMC的一個開源流式儲存系統,Flink搭配它也可以實現端到端的exactly-once semantics。
本例中的Flink應用包含以下元件,如上圖所示:
1. 一個source,從Kafka中讀取資料(即KafkaConsumer)
2. 一個時間視窗化的聚會操作
3. 一個sink,將結果寫回到Kafka(即KafkaProducer)
若要sink支援 exactly-once semantics,它必須以事務的方式寫資料到Kafka,這樣當提交事務時兩次checkpoint間的所有寫入操作當作為一個事務被提交。這確保了出現故障或崩潰時這些寫入操作能夠被回滾。
當然了,在一個分散式且含有多個併發執行sink的應用中,僅僅執行單次提交或回滾是不夠的,因為所有元件都必須對這些提交或回滾達成共識,這樣才能保證得到一個一致性的結果。Flink使用兩階段提交協議以及預提交(pre-commit)階段來解決這個問題。
Flink checkpointing開始時便進入到pre-commit階段。具體來說,一旦checkpoint開始,Flink的JobManager向輸入流中寫入一個checkpoint barrier將流中所有訊息分割成屬於本次checkpoint的訊息以及屬於下次checkpoint的。barrier也會在操作運算元間流轉。對於每個operator來說,該barrier會觸發operator狀態後端為該operator狀態打快照。
眾所周知,flink kafka source儲存Kafka消費offset,一旦完成位移儲存,它會將checkpoint barrier傳給下一個operator。
這個方法對於opeartor只有內部狀態的場景是可行的。所謂的內部狀態就是完全由Flink狀態儲存並管理的——本例中的第二個opeartor:時間視窗上儲存的求和資料就是這樣的例子。當只有內部狀態時,pre-commit階段無需執行額外的操作,僅僅是寫入一些已定義的狀態變數即可。當chckpoint成功時Flink負責提交這些寫入,否則就終止取消掉它們。
當時,一旦operator包含外部狀態,事情就不一樣了。我們不能像處理內部狀態一樣處理這些外部狀態。因為外部狀態通常都涉及到與外部系統的互動。如果是這樣的話,外部系統必須要支援可與兩階段提交協議捆綁使用的事務才能確保實現整體的exactly-once semantics。
顯然本例中的data sink是有外部狀態的,因為它需要寫入資料到Kafka。此時的pre-commit階段下data sink在儲存狀態到狀態儲存的同時還必須預提交它的外部事務,如下圖所示:
當checkpoint barrier在所有operator都傳遞了一遍且對應的快照也都成功完成之後,pre-commit階段才算完成。該過程中所有建立的快照都被視為是checkpoint的一部分。其實,checkpoint就是整個應用的全域性狀態,當然也包含pre-commit階段提交的外部狀態。當出現崩潰時,我們可以回滾狀態到最新已成功完成快照時的時間點。
下一步就是通知所有的operator,告訴它們checkpoint已成功完成。這便是兩階段提交協議的第二個階段:commit階段。該階段中JobManager會為應用中每個operator發起checkpoint已完成的回撥邏輯。
本例中的data source和視窗操作無外部狀態,因此在該階段,這兩個opeartor無需執行任何邏輯,但是data sink是有外部狀態的,因此此時我們必須提交外部事務,如下圖所示:
彙總以上所有資訊,總結一下:
1. 一旦所有operator完成各自的pre-commit,它們會發起一個commit操作
2. 倘若有一個pre-commit失敗,所有其他的pre-commit必須被終止,並且Flink會回滾到最近成功完成decheckpoint
3. 一旦pre-commit完成,必須要確保commit也要成功——operator和外部系統都需要對此進行保證。倘若commit失敗(比如網路故障等),Flink應用就會崩潰,然後根據使用者重啟策略執行重啟邏輯,之後再次重試commit。這個過程至關重要,因為倘若commit無法順利執行,就可能出現數據丟失的情況
因此,所有opeartor必須對checkpoint最終結果達成共識:即所有operator都必須認定資料提交要麼成功執行,要麼被終止然後回滾。
3Flink中實現兩階段提交這種operator的管理有些複雜,這也是為什麼Flink提取了公共邏輯並封裝進TwoPhaseCommitSinkFunction抽象類的原因。
下面討論一下如何擴充套件TwoPhaseCommitSinkFunction類來實現一個簡單的基於檔案的sink。若要實現支援exactly-once semantics的檔案sink,我們需要實現以下4個方法:
1. beginTransaction:開啟一個事務,在臨時目錄下建立一個臨時檔案,之後,寫入資料到該檔案中
2. preCommit:在pre-commit階段,flush快取資料塊到磁碟,然後關閉該檔案,確保再不寫入新資料到該檔案。同時開啟一個新事務執行屬於下一個checkpoint的寫入操作
3. commit:在commit階段,我們以原子性的方式將上一階段的檔案寫入真正的檔案目錄下。注意:這會增加輸出資料可見性的延時。通俗說就是使用者想要看到最終資料需要等會,不是實時的。
4. abort:一旦終止事務,我們離自己刪除臨時檔案
當出現崩潰時,Flink會恢復最新已完成快照中應用狀態。需要注意的是在某些極偶然的場景下,pre-commit階段已成功完成而commit尚未開始(也就是operator尚未來得及被告知要開啟commit),此時倘若發生崩潰Flink會將opeartor狀態恢復到已完成pre-commit但尚未commit的狀態。
在一個checkpoint狀態中,對於已完成pre-commit的事務狀態,我們必須儲存足夠多的資訊,這樣才能確保在重啟後要麼重新發起commit亦或是終止掉事務。本例中這部分資訊就是臨時檔案所在的路徑以及目標目錄。
TwoPhaseCommitSinkFunction考慮了這種場景,因此當應用從checkpoint恢復之後TwoPhaseCommitSinkFunction總是會發起一個搶佔式的commit。這種commit必須是冪等性的,雖然大部分情況下這都不是問題。本例中對應的這種場景就是:臨時檔案不在臨時目錄下,而是已經被移動到目標目錄下。
4總結本文的一些關鍵要點:
Flinkcheckpointing機制是實現兩階段提交協議以及提供僅一次語義的基石
與其他系統持久化傳輸中的資料不同,Flink不需要將計算的每個階段寫入到磁碟中
Flink新的TwoPhaseCommitSinkFunction封裝兩階段提交協議的公共邏輯使之搭配支援事務的外部系統來共同構建僅一次語義應用成為可能
自1.4版本起,Flink + Pravega和Kafka 0.11 producer開始支援僅一次語義
Flink Kafka 0.11 producer基於TwoPhaseCommitSinkFunction實現,比起至少一次語義的producer而言開銷並未顯著增加
推薦閱讀: