1. 程式人生 > >kafka訊息交付語義的分析

kafka訊息交付語義的分析

在kafka中,在producer和consumer這兩個維度上都有三種訊息交付的語義:

At most once ---- 訊息可能會丟失但絕不重傳.
At least once ---- 訊息可以重傳但絕不丟失.
Exactly once ---- 每一條訊息只被傳遞一次.

先來看producer

producer設定中有這麼一個選項:

每傳送一次訊息,都會要求broker返回一個訊息回執,即ack。如果ack沒有收到,producer會進行重發,如果設定了重發次數的話。這個ack有三種模式:

// The level of acknowledgement reliability needed from the broker (defaults
// to
WaitForLocal). Equivalent to the `request.required.acks` setting of the JVM producer. // 等同於jvm kafka中的`request.required.acks` RequiredAcks RequiredAcks type RequiredAcks int16 const ( // 第一個模式,NoResponse doesn't send any response, the TCP ACK is all you get. NoResponse RequiredAcks = 0 //第二個模式, WaitForLocal waits for
only the local commit to succeed before responding. WaitForLocal RequiredAcks = 1 // 第三個模式,WaitForAll waits for all in-sync replicas to commit before responding. // The minimum number of in-sync replicas is configured on the broker via // the `min.insync.replicas` configuration key. WaitForAll RequiredAcks = -1
)

如果RequiredAcks設定為0,在這種情況下,伺服器是否收到請求是沒法保證的,並且引數retries(重發)也不會生效(因為客戶端無法獲得失敗資訊)。此時提供的是At most once的語義。

如果RequiredAcks大於0,producer在沒有收到應答的情況下,會進行重發,此時提供的是At least once的語義。

冪等性保證Exactly once

在kafka 0.11.0.0之前,是無法保證Exactly once的,但從0.11.0.0開始,producer引入了冪等性的概念,保證訊息只會被傳遞一次。

那麼kafka是如何實現的呢,用到了Producer ID(即PID)和Sequence Number。

PID:每個新的Producer在初始化的時候會被分配一個唯一的PID,這個PID對使用者是不可見的。
Sequence Numbler:對於每個PID,該Producer傳送資料的每個[ Topic, Partition ]都對應一個從0開始單調遞增的Sequence Number。

Broker端在快取中儲存了Sequence Numbler,對於接收的每條訊息,如果其序號比Broker快取中的序號大於1則接受它,否則將其丟棄,這樣就可以防止了訊息重複提交。
但是,以上只能保證單個Producer對於同一個[ Topic, Partition ]的Exactly Once語義。不保證同一個Producer一個topic下不同Partition的冪等。

事務保證Exactly once

從0.11.0.0開始,kafka支援了producer事務。要注意的一點是,不要把操作db的業務邏輯跟操作訊息當成一個事務,其實是有問題的,因為操作DB資料庫的資料來源是DB,訊息資料來源是kfaka,是完全不同兩個資料,一種資料來源(如mysql,kafka)對應一個事務,所以它們是兩個獨立的事務。kafka事務指kafka一系列 生產、消費訊息等操作組成一個事務。db事務是指操作資料庫的一系列增刪改操作組成一個事務。

對於producer和生產訊息來說,如果是隻有寫,即一條訊息要傳送給多個topic,可以使用producer事務來保證要麼都發送了,要麼就都沒有傳送。

如果有消費訊息然後再發送給別的topic,最後提交offset,也可以使用producer事務來保證這一系列操作的原子性。比如消費者提交offset出現問題,導致consumer在重複消費訊息的時候,生產者會重複的生產訊息給另外的消費者。

如果只是消費訊息和提交offset,producer事務就顯得沒有意義了,因為這個和手動提交offset沒有什麼區別。

現在再從consumer角度來看

Consumer 讀取到訊息之後,先進行offset提交,然後再處理訊息,如果訊息處理到一半失敗了,那這條訊息就再也不會被消費了,這對應於at-most-once的語義。

Consumer 讀取到訊息之後,先處理訊息,最後再offset提交。這樣如果處理訊息成功,在offset提交之前服務崩潰了,那麼重啟之後這條訊息會再次被消費到,這對應於at-least-once的語義。

如果要Exactly once語義,則可以使用如下手段:
消費處理失敗指的是業務失敗或者操作db失敗。
消費處理成功指的是業務成功或者操作db成功。
1,如果消費處理失敗的話需要額外記錄此條訊息的offset,對於有順序要求的消費來說,此時還得停止消費。下次再統一去消費這些處理失敗的offset的訊息。
2,同樣消費處理失敗,也可以利用producer事務來保證,比如提交offset並且把offset傳送到另一個topic中,來保證這一系列的原子性,消費處理失敗了,則中斷事務,offset就不會被髮送到topic中,topic中儲存的還是上次那個offset。
3,如果消費處理成功,需要額外儲存最新提交的offset到檔案系統中,然後提交offset。這樣不管offset提交成功還是失敗,重啟之後都可以從檔案中拿到最新的offset。
4,或者,消費處理成功的同時,比如db操作成功的同時,把offset寫到db中,意思就是consumer將offset儲存和其輸出在相同的位置。然後提交offset。這樣我的輸出位置儲存的也是最新最準確的offset。