Kafka 0.11.0.0 是如何實現 Exactly-once 語義的
很高興地告訴大家,具備新的里程碑意義的功能的Kafka 0.11.x版本(對應 Confluent Platform 3.3)已經release,該版本引入了exactly-once語義,本文闡述的內容包括:
- Apache Kafka的exactly-once語義;
- 為什麼exactly-once是一個很難解決的分散式問題;
- 使用Kafka Stream API來進行正確的exactly-once流式處理;
Exactly-once 是真正意義上的難題
從理論上來說,Exactly-once delivery是不可能的,它的代價太高無法實際應用到生產環境,包括業內的大牛Mathias Verroaes也這麼認為,它是分散式系統中最難解決的唯二問題:
Exactly-once delivery is a really hard problem
甚至有很多人說這是無法實現的:
Exactly-once delivery is impossible
但現在,我並不認為引入Exactly-once delivery並且支援流處理是一個真正難以解決的問題。首先,讓我們來概述下訊息的精確提交語義。
訊息語義概述
在分散式系統中,構成系統的任何節點都是被定義為可以彼此獨立失敗的。比如在 Kafka中,broker可能會crash,在producer推送資料至topic的過程中也可能會遇到網路問題。根據producer處理此類故障所採取的提交策略型別,我們可以獲得不同的語義:
- at-least-once:如果producer收到來自Kafka broker的確認(ack)或者acks = all,則表示該訊息已經寫入到Kafka。但如果producer ack超時或收到錯誤,則可能會重試傳送訊息,客戶端會認為該訊息未寫入Kafka。如果broker在傳送Ack之前失敗,但在訊息成功寫入Kafka之後,此重試將導致該訊息被寫入兩次,因此訊息會被不止一次地傳遞給最終consumer,這種策略可能導致重複的工作和不正確的結果。
- at-most-once:如果在ack超時或返回錯誤時producer不重試,則該訊息可能最終不會寫入Kafka,因此不會傳遞給consumer。在大多數情況下,這樣做是為了避免重複的可能性,業務上必須接收資料傳遞可能的丟失。
- exactly-once:即使producer重試傳送訊息,訊息也會保證最多一次地傳遞給最終consumer。該語義是最理想的,但也難以實現,這是因為它需要訊息系統本身與生產和消費訊息的應用程式進行協作。例如如果在消費訊息成功後,將Kafka consumer的偏移量rollback,我們將會再次從該偏移量開始接收訊息。這表明訊息傳遞系統和客戶端應用程式必須配合調整才能實現excactly-once。
必須處理的常見災難場景
為了清楚描述實現 exactly-once delivery語義的挑戰,我們來看一個簡單的例子。
假設有某個單程序producer應用在傳送"Hello Kafka"到某個單partition topic(topic_name=EoS),有一個執行在其他節點的單例項consumer從topic里拉資料並進行列印。理想情況下如果沒有任何災難發生的話,"Hello Kafka"將會被exactly-once傳遞,consumer獲取訊息進行消費並提交commit到Kafka去完成這一次訊息處理。即使在這之後consumer掛了或者被重啟,也不會再收到這條訊息。
然而生產環境錯綜複雜,災難場景是無法避免的:
- Broker失敗:Kafka,作為一個高可用、持久化系統,保證每條訊息被持久化並且冗餘多份(假設是n份),所以理論上Kafka可以容忍n-1臺broker宕機。Kafka的備份機制保證了一旦訊息被成功寫入leader replica,將會把資料同步到其他所有replica。
- Producer到Broker的RPC失敗:Kafka的durability特性是基於producer從broker收到的ack的,而沒有收到ack並不代表請求肯定失敗。Broker可能會在訊息被寫入之後返回ack之前宕機,同時也可能會在訊息被寫入topic之前宕機。因為producer沒有任何途徑可以得知失敗的真實原因,而只會嘗試重試。在一些場景下,下游consumer會收到若干的重複資料。
- 客戶端也可能會失敗:Exactly-once delivery也必須考慮客戶端失敗的情況。但是我們如何去區分客戶端是真的掛了(永久性宕機)還是說只是暫時丟失心跳?追求正確性的話,broker應該丟棄由zombie producer傳送的訊息。 consumer也是如此,一旦新的客戶端例項已經啟動,它必須能夠從失敗例項的任何狀態中恢復,並從安全點(safe checkpoint)開始處理,這意味著消費的偏移量必須始終與生成的輸出保持同步。
Apache Kafka的exactly-once語義
在0.11.x版本之前,Apache Kafka支援at-least-once delivery語義以及partition內部的順序delivery,如前所述這在某些場景下可能會導致資料重複消費。而Kafka 0.11.x支援exactly-once語義,不會導致該情況發生,其中主要包括三個內部邏輯的改造:
冪等:partition內部的exactly-once順序語義
冪等操作,是指可以執行多次,而不會產生與僅執行一次不同結果的操作,Producer的send操作現在是冪等的。在任何導致producer重試的情況下,相同的訊息,如果被producer傳送多次,也只會被寫入Kafka一次。要開啟此功能,並讓所有partition獲得exactly-once delivery、無資料丟失和in-order語義,需要修改broker的配置:enable.idempotence = true。
這個功能如何工作?它的工作方式類似於TCP:傳送到Kafka的每批訊息將包含一個序列號,該序列號用於重複資料的刪除。與TCP不同,TCP只能在transient in-memory中提供保證。序列號將被持久化儲存topic中,因此即使leader replica失敗,接管的任何其他broker也將能感知到訊息是否重複。這種機制的開銷相當低:它只是在每批訊息中添加了幾個額外欄位。正如本文稍後將會看到的,該功能僅僅在非冪等producer上增加了可忽略的效能開銷。
事務:跨partition的原子性寫操作
第二點,Kafka現在支援使用新事務API原子性的對跨partition進行寫操作,該API允許producer傳送批量訊息到多個partition。該功能同樣支援在同一個事務中提交消費者offsets,因此真正意義上實現了end-to-end的exactly-once delivery語義。以下是一段示例程式碼:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch(ProducerFencedException e) {
producer.close();
} catch(KafkaException e) {
producer.abortTransaction();
}
該程式碼片段描述瞭如何使用新的producer事務API原子性的傳送訊息至多個partition。值得注意的是,某個Kafka topic partition內部的訊息可能是事務完整提交後的訊息,也可能是事務執行過程中的部分訊息。
而從consumer的角度來看,有兩種策略去讀取事務寫入的訊息,通過"isolation.level"來進行配置:
read_committed
:可以同時讀取事務執行過程中的部分寫入資料和已經完整提交的事務寫入資料;read_uncommitted
:完全不等待事務提交,按照offsets order去讀取訊息,也就是相容0.11.x版本前Kafka的語義;
我們必須通過配置consumer端的配置isolation.level
,來正確使用事務API,通過使用 new Producer API並且對一些unique ID設定transaction.id
(該配置屬於producer端),該unique ID用於提供事務狀態的連續性。
Exactly-once 流處理
基於冪等和原子性,通過Streams API實現exactly-once流處理成為可能。如果要在流應用中實現相關語義,只需要配置 processing.guarantee=exactly_once
,這會影響所有的流處理環境中的語義,包括將處理作業和由加工作業建立的所有物理狀態同時寫回到Kafka的操作。
這就是為什麼Kafka Streams API提供的exactly-once保證是迄今為止任何流處理系統中的最強實現的原因。 它為以Kafka作為資料來源的流處理應用程式提供端對端的exactly-once保證,Streams應用程式將任何Kafka的物化狀態在最終環節寫回到Kafka。 僅依靠外部資料系統實現物化狀態的流處理系統僅支援對exactly-once的較弱保證。 即使他們使用Kafka作為流處理來源,在需要從故障中恢復的情況下,也只能rollback他們的Kafka消費者offset以重新消費並處理訊息,而不能回滾關聯狀態,當更新不是冪等的時候會導致結果不正確。
我來解釋下這段話的細節。 流處理系統的關鍵問題是我的流處理應用程式是否獲得正確的答案,即使其中一個例項在處理過程中崩潰,恢復失敗例項時的關鍵是把狀態恢復到與崩潰前相同。
流處理可以看成是一個關於Kafka topic的讀寫操作集合, 消費者從Kafka topic讀取訊息,其他一些處理邏輯轉換訊息或修改cpu維護的狀態,同時生產者將訊息寫入另一個Kafka topic。 Exactly-once流處理就是保證讀寫資料有且只有一次的一種能力。,在這種情況下,獲得正確結果意味著不丟失任何輸入訊息或產生任何重複的輸出,而這就是使用者所期望的。
除了我們迄今為止討論的簡單災難場景之外,還有許多其他故障情況需要考慮:
- 流處理器可能會從多個source topic獲取輸入,並且跨多個source topic的資料排序不是確定的,因此多次執行可能會產生不同的結果;
- 同樣,流處理器可能產生多個dest topic的輸出。如果生產者無法跨多個topic執行原子寫入,如果對某些(但不是全部)分割槽的寫入失敗,則producer的輸出可能不正確;
- 流處理器可以使用Streams API提供的managed state facilities去聚合或join多個輸入的資料。如果流處理器的一個例項失敗,那麼需要能夠回滾該流處理器例項的物化狀態。在重新啟動例項時,還需要能夠恢復處理並重新建立其狀態。
- 流處理器可以查詢外部資料庫或者調通服務來豐富資訊。基於外部服務的流處理器基本上來說是非確定性的:如果外部服務在流處理器的兩次執行之間改變其內部狀態,則會導致下游的結果出錯。但是,如果處理正確,則不會導致完全不正確的結果,而僅僅會導致流處理器的輸出是期望輸出的子集。
特別是當與非確定性操作和應用程式計算的持久狀態的更改相結合時,如果例項失敗或者重新啟動,可能導致資料重複甚至是計算結果錯誤。
"流處理保證確定性操作exactly-once的正確方法是:保證讀取寫入操作的輸出在任何非災難場景下一致。"
針對非確定性操作的exactly-one流處理
Exactly-once流處理對確定性操作是有意義的,但是當處理邏輯本身存在不確定的邏輯時呢?假設有這樣一個場景,流處理器用於計算滿足條件的流入的事件數量,條件由外部服務動態決定。從根本上來說這種操作本質上是非決定性的,因為外部服務指定的條件是不確定的,這可能會導致下游資料流得到不同的結果。那麼,對這樣的非確定性操作來說,正確的策略又是什麼呢?
"對於非確定性操作來說,正確的處理方式是確保讀取寫入流處理操作的輸出屬於預期輸出的子集,該集合應該可以由非確定性輸入得到的預期值組合得到。"
因此,對於我們的示例流處理器,假設當前計數為31,輸入事件值為2,故障時正確輸出只能是31或者33其中一個:如果輸入事件被外部條件指定需要丟棄那麼就是31 ,反之則為33。
Kafka的exactly-once保證真的起作用了嗎?
為了回答這個關於Kafka exactly-once保證的問題,讓我們來看看正確性(也就是我們如何設計,構建和測試這個功能)和效能。
精妙的設計和review過程
正確性和效能都從堅實的設計開始。 大約三年前,我們開始在LinkedIn上進行設計和原型開發工作。 我們在Confluent上尋求一個優雅的方式來將冪等和事務的功能性要求融合成一個整體的封裝。 我們寫了一個60+頁的設計文件,概述了設計的各個方面:從高階訊息流到每個資料結構和RPC的細節實現細節。 經過9個月的廣泛公眾監督,設計也從社群的不斷反饋中大大得到改善。 例如,基於開源討論,我們用更智慧的伺服器端過濾替代消費者端快取以進行事務讀取,從而避免了潛在的效能開銷。 同時,我們也改進了事務與compacted topic,並增加了相應的安全機制。
最終我們機智地得到了一個極簡設計,在很大程度上也依賴於強大的Kafka原型:
- 事務日誌是一個Kafka topic,享受到了與生俱來的durability;
- Broker內部新增了事務協調執行緒(用於管理每個生產者的事務狀態),自然地利用了Kafka自有的選舉演算法來處理failover;
- 對於使用了Kafka Streams API構建的流處理應用程式,我們會將資料透明地fold起來合併成原子性操作以事務的形式寫入多個分割槽,從而為讀取寫入操作提供exactly-once保證;
這種足夠簡單、專注於細節的設計,實施效果非常好。
迭代的開發過程
我們在開發該功能時,會確保每一個pull request經過廣泛的審查。這意味著在幾個月的時間內一些pull request經歷過幾十次迭代,審查過程中發現了之前設計上沒有考慮到的無數邊界問題。
我們編寫了超過15,000個測試用例,包括分散式測試,執行時的故障測試。該流程揭示了各個方面的問題,從測試工具中的基本編碼錯誤到深奧的NTP同步問題。其中的一個子集是分散式混沌測試,我們為多個事務客戶端提供了一個完整的Kafka叢集,通過事務產生訊息,同時讀取這些訊息,並在過程中強行終止客戶端或伺服器,以確保資料既不丟失也不重複。
因此經過良好測試,高質量程式碼庫的簡單而堅固的設計構成了我們解決方案的基石。
好訊息:Kafka 還是非常快!
在設計此功能時,一個重點是效能的保證:由於exactly-once設計帶來的效能開銷,我們淘汰了許多更簡單的設計選型。經過多番思考,我們採用的設計儘可能地使每個事務的開銷最小(每個分割槽約1次寫入,儘可能少的寫入記錄至中心事務日誌)。對於耗時100ms的1KB訊息和事務寫入,與配置為at-least-once並且保序交付(acks = all,max.in.flight.requests.per.connection = 1)的生產者的吞吐量相比吞吐量僅下降3%;與at-most-once並且無排序保證(acks = 1,max.in.flight.requests.per.connection = 5)的生產者的吞吐量相比下降20%。
具體的測試benchmark可以看這裡。
除了確保新功能的低效能開銷之外,我們也不希望在沒有使用exactly-once功能的應用程式中看到效能有意外損耗。為了確保這一點,我們不僅在Kafka訊息頭中添加了一些新的欄位來實現exactly-once功能,而且還重新設計了Kafka訊息格式,在網路傳輸和磁碟儲存時,更有效地壓縮訊息。特別是,我們將一大堆常見的元資料轉移到批量標頭檔案中,並將可變長度編碼引入批次中的每個記錄。通過這種批量優化,整體資訊的size顯著減小。例如,一批7條記錄、每條10個位元組的批量訊息,使用新的格式將減少35%的體量,這使得生產者吞吐量提高了20%,處理小訊息時提高了50%的消費者吞吐量。任何Kafka 0.11使用者都可以使用此效能提升,即使沒有使用任何exactly-once功能。
我們還著眼於優化Streams API中的exactly-once流處理的開銷。 以100ms作為提交間隔的情況下(保證端到端延遲較低的一個值),我們看到吞吐量下降了15%至30%(損耗百分比取決於訊息大小,前者為1KB的訊息大小,後者為100位元組)。 但是,對於>=1KB的訊息,30秒的提交間隔是沒有任何吞吐效能損耗的。 在下一個版本中,我們計劃引入推測性執行機制:即使我們使用較大的提交間隔,我們也可以保持端到端的延遲較低,最終我們期望將事務的開銷降至零。
總而言之,通過從根本上重新調整我們的一些核心資料結構,我們在較小的效能損耗下實現了冪等和事務功能使得Kafka在大部分場景下依然很快。
這個魔法小精靈粉塵可以灑在我的應用程式上嗎?
Exacrtly-once處理是一種端到端的保證,在灑上去之前應用程式必須保證自身設計不違反該原則。 如果您使用的是消費者API,則必須保證你提交的應用程式狀態變更和你的偏移量是一致的。
對於流處理應用,情況會更好一些。 因為流處理是一個封閉的系統,其中輸入、輸出和狀態修改都在相同的操作中建模,它實際上已經類似於exactly-once中的事務,具備原子性了。 配置更改就直接可以為您提供端到端的保證。 但是,您仍然需要從Kafka獲取資料,當與exactly-once的connector組合時,將直接擁有該特性。