1. 程式人生 > >Kafka消費者提交操作

Kafka消費者提交操作

消費者每次呼叫poll()方法,它總是返回由生產者寫入Kafka但沒有被消費者讀取過的記錄,我們因此可以知道哪些訊息是被群組裡的哪個消費者讀取的。Kafka不會像JMS佇列那樣需要得到消費者的確認,消費者使用broker裡的MetaData來追蹤訊息在分割槽裡的位置(offset)。

更新分割槽當前位置的操作叫提交。

消費者會向_consumer_offset的特殊主題傳送訊息,訊息裡包含每個分割槽的offset。如果消費者一直處於執行狀態,offset就沒什麼用處。如果消費者發生崩潰或者有新的消費者加入群組,就會觸發再均衡,完成再均衡後,消費者可能會得到新的分割槽,而不是原來處理的哪個分割槽。為了能繼續之前的工作,消費者需要讀取每個分割槽最後一次提交的offset,然後從這個offset指定的地方繼續處理。

提交操作會導致訊息重複消費或者丟失。

如果提交的offset小於客戶端處理的最後一個訊息的offset,那麼處於兩個偏移量之間的訊息就會被重複處理。例如,消費者A向分割槽1提交offset,這時傳送了再均衡,分割槽1被分配給消費者B,由於網路原因,A提交的offset還沒有到達broker,那麼由於offset未更新,被A消費的訊息就會被B重複消費。可以維護一個提交的自增版本號,如果這個版本號與提交的版本號一樣,那麼更新offset,否則,拒絕更新offset。

如果提交的offset大於客戶端的最後一個訊息的偏移量,那麼處於兩個偏移量之間的訊息將會丟失。消費者A消費了分割槽0的一批訊息,訊息還在記憶體中處理,到了自動提交時間,提交了offset,但是隨後消費者掛了,就會導致這部分訊息丟失。

提交方式:自動提交、提交當前偏移量、非同步提交、同步非同步組合提交、提交特定的偏移量。

自動提交:最簡單的提交方式,但是可能導致重複消費訊息。設定enable.auto.commit=true,提交的時間間隔由auto.commit.intrval.ms控制,預設是5s。自動提交是在poll()方法裡進行的,消費者每次進行輪詢時會檢查是否提交該偏移量了,如果是,那麼就會提交分割槽當前偏移量。

要確保發生再均衡之前提交偏移量。