1. 程式人生 > >kafka自動提交offset失敗:Auto offset commit failed

kafka自動提交offset失敗:Auto offset commit failed

今天在服務日誌中觀察資料的消費情況時,發現了一個如下的警告,而且每隔幾秒就會出現一次,雖然只是個警告,

Auto offset commit failed for group order_group:
 Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
 This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms,
 which typically implies that the poll loop is spending too much time message processing.
 You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

但是看到auto offset commit failed時我就不淡定了,不難看出是自動提交offset失敗了,我們都知道,kafka的資料更新消費都是通過在zookeeper中標記一個偏移量(offset)來記錄每個分割槽的消費位置,所以一旦offset更新失敗,不難想象肯定會出現重複消費資料的問題!

通過以上資訊分析大概意思是:kafka消費者在處理訊息時,在指定時間內(session.time.out)沒有處理完,consumer coordinator會由於沒有接受到心跳而掛掉,導致自動提交offset失敗,因此就會像日誌中所說的發生rebalanced(重平衡即重新分配partition給客戶端),而之前提交的offset已經失敗了,所以重新分配的客戶端又會消費之前的資料,接著consumer重新消費,又出現了消費超時,無限迴圈下去。

解決方法:

將enable.auto.commit設定成false,即不採用自動提交方式;

由於使用了spring-kafka,禁止kafka-client自動提交offset,因為就是之前的自動提交失敗,導致offset永遠沒更新,從而轉向使用spring-kafka的offset提交機制。

1)如果auto.commit關掉的話,spring-kafka會啟動一個invoker,這個invoker的目的就是啟動一個執行緒去消費資料,他消費的資料不是直接從kafka裡面直接取的,那麼他消費的資料從哪裡來呢?他是從一個spring-kafka自己建立的阻塞佇列裡面取的。

2)然後會進入一個迴圈,從原始碼中可以看到如果auto.commit被關掉的話, 他會先把之前處理過的資料先進行提交offset,然後再去從kafka裡面取資料。

3)然後把取到的資料丟給上面提到的阻塞列隊,由上面建立的執行緒去消費,並且如果阻塞佇列滿了導致取到的資料塞不進去的話,spring-kafka會呼叫kafka的pause方法,則consumer會停止從kafka裡面繼續再拿資料。

4)接著spring-kafka還會處理一些異常的情況,比如失敗之後是不是需要commit offset這樣的邏輯

參考:https://www.jianshu.com/p/9b444d4b32c0