1. 程式人生 > 其它 >kafka之淺談如何去保證資料不重複消費

kafka之淺談如何去保證資料不重複消費

技術標籤:kafka工作日記javakafkahadoopredis後端

kafka之淺談如何去保證資料不重複消費

一。背景:上游資料流,將資料推入kafka中,作為消費者,消費資料並進行處理,對於交易資料,非常敏感,不能出現重複,在消費這一過程中,如何去保證我們不會去重複消費資料。

二。導致資料重複消費的原因一般有:
1.資料消費處理成功(落地入庫,或者各種處理成功),向kafka中提交偏移量時,由於宕機,或者斷網之類的失敗了,這時候其實相對與系統來說,這筆資料已經是處理過了,就會出現重複資料。
2.一般是有新的消費者加入之類的,發生了再均衡,導致資料重發消費。
三。專案使用真實使用

方案:
1.關閉自動提交,設定enable-auto-commit為 false,採用手動Offset提交
2.提交方式,簡單提一下兩種提交方式的卻別
consumer.commitAsync();//非同步提交
非同步提交:非同步提交,非阻塞,有更好的併發效能,但是同時也是還會提交失敗,導致資料重複。
consumer.commitSync();//同步提交
同步提交:同步提交,是阻塞的,在提交失敗時,會一直阻塞,直到超時,可回撥。

這裡我們採用非同步+同步提交,非同步提交失敗後,採用同步提交,一直阻塞,達到超時時間還未提交,執行同步提交回調,可記錄提交失敗的資料
簡單示例:
public void commitOffset() {

try {
consumer.commitAsync();//非同步提交
} catch (CommitFailedException e) {
consumer.commitSync();
}
}
3.每次使用map跟蹤記錄偏移量,並將偏移量維護到資料庫中,可作為唯一標識欄位等。
在這裡插入圖片描述

4.在訂閱資料時,重寫ConsumerRebalandeListener再分割槽監聽器,即繼承ConsumerRebalanceListener介面,
在這裡插入圖片描述
5.當發生再均衡前後也要記錄提交偏移量,根據具體的使用場景,也可以選著將offset維護在庫裡面,比如,在對消費的資料進行落地時這一場景,有可能會出現入庫成功,提交失敗,或者提交成功,入庫失敗等,這一類場景最安全的就是落地維護offset,但是也確實帶來了一定的複雜度,對庫的讀寫效能要求也比較高,redis就比較合適,這裡不深挖了。

重寫方法:onPartitionsRevoked,該方法會在再均衡開始之前和消費者停止讀取訊息之後被呼叫。在這個方法裡,我們需要做的是,把map中跟蹤的偏移量進行提交。
簡單例子:
public void onPartitionsRevoked(Collection collection) {
consumer.commitSync();
if(logger.isInfoEnabled()){
logger.info("*- in ralance:onPartitionsRevoked");
}
}
重寫方法:onPartitionsAssigned,該方法會在重新分配partition之後和消費者開始讀取訊息之前被呼叫。這裡我們需要做的是,讀取我們庫中維護的offset,並使用seek設定開始offset位置。
@Override
public void onPartitionsAssigned(Collection collection) {
//rebalance之後 獲取新的分割槽,獲取最新的偏移量,設定拉取分量
for (TopicPartition partition : collection) {

        if(logger.isInfoEnabled()){
            logger.info("*- partition:" + partition.partition());
        }
        //獲取消費偏移量,實現原理是向協調者傳送獲取請求

        OffsetAndMetadata offset = consumer.committed(partition);
        //設定本地拉取分量,下次拉取訊息以這個偏移量為準

        consumer.seek(partition,getOffsetFromDB(partition));
    }

這裡主要體現思路,具體的方案,一切從實際業務出發,不然就是耍流氓,但是大多思路不變,變動的是細節

熱愛生活的碼小子wmxiang:
工作之餘,記錄自己平時的一些小毛病,以及問題排查和解決方案思路。記錄自己的經驗和不足之處,筆記中可能會有很多不足之處,歡迎各位留言指正討論。