Kafka 如何保證訊息不被重複消費?或者說,如何保證訊息消費的冪等性?
阿新 • • 發佈:2022-02-13
如何保證訊息不被重複消費?或者說,如何保證訊息消費的冪等性?
》冪等性,通俗點說,就一個數據,或者一個請求,給你重複來多次,你得確保對應的資料是不會改變的,不能出錯。kafka 的機制:Kafka 實際上有個offset 的概念,就是每個訊息寫進去,都有一個offset,代表訊息的序號,然後 consumer 消費了資料之後,每隔一段時間(定時定期),會把自己消費過的訊息的offset提交一下,表示“我已經消費過了,下次我要是重啟啥的,你就讓我繼續從上次消費到的offset來繼續消費吧”。
但是凡事總有意外,比如我們之前生產經常遇到的,就是你有時候重啟系統,看你怎麼重啟了,如果碰到點著急的,直接 kill 程序了,再重啟。這會導致 consumer 有些訊息處理了,但是沒來得及提交offset,尷尬了。重啟之後,少數訊息會再次消費一次。
有這麼個場景。資料 1/2/3 依次進入 kafka,kafka 會給這三條資料每條分配一個 offset,代表這條資料的序號,我們就假設分配的 offset 依次是 152/153/154。消費者從 kafka 去消費的時候,也是按照這個順序去消費。假如當消費者消費了 offset=153 的這條資料,剛準備去提交offset 到 zookeeper,此時消費者程序被重啟了。那麼此時消費過的資料 1/2 的offset 並沒有提交,kafka 也就不知道你已經消費了 offset=153 這條資料。那麼重啟之後,消費者會找kafka 說,嘿,哥兒們,你給我接著把上次我消費到的那個地方後面的資料繼續給我傳遞過來。由於之前的offset 沒有提交成功,那麼資料 1/2 會再次傳過來,如果此時消費者沒有去重的話,那麼就會導致重複消費。
其實還是得結合業務來思考,我這裡給幾個思路:- 比如你拿個資料要寫庫,你先根據主鍵查一下,如果這資料都有了,你就別插入了,update 一下好吧。
- 比如你是寫 Redis,那沒問題了,反正每次都是 set,天然冪等性。
- 比如你不是上面兩個場景,那做的稍微複雜一點,你需要讓生產者傳送每條資料的時候,裡面加一個全域性唯一的 id,類似訂單 id 之類的東西,然後你這裡消費到了之後,先根據這個 id 去比如 Redis 裡查一下,之前消費過嗎?如果沒有消費過,你就處理,然後這個 id 寫Redis。如果消費過了,那你就別處理了,保證別重複處理相同的訊息即可。
- 比如基於資料庫的唯一鍵來保證重複資料不會重複插入多條。因為有唯一鍵約束了,重複資料插入只會報錯,不會導致資料庫中出現髒資料.
如何保證訊息不被重複消費?或者說,如何保證訊息消費的冪等性?
》冪等性,通俗點說,就一個數據,或者一個請求,給你重複來多次,你得確保對應的資料是不會改變的,不能出錯。kafka 的機制:Kafka 實際上有個offset 的概念,就是每個訊息寫進去,都有一個offset,代表訊息的序號,然後 consumer 消費了資料之後,每隔一段時間(定時定期),會把自己消費過的訊息的offset提交一下,表示“我已經消費過了,下次我要是重啟啥的,你就讓我繼續從上次消費到的offset來繼續消費吧”。
但是凡事總有意外,比如我們之前生產經常遇到的,就是你有時候重啟系統,看你怎麼重啟了,如果碰到點著急的,直接 kill 程序了,再重啟。這會導致 consumer 有些訊息處理了,但是沒來得及提交offset,尷尬了。重啟之後,少數訊息會再次消費一次。
有這麼個場景。資料 1/2/3 依次進入 kafka,kafka 會給這三條資料每條分配一個 offset,代表這條資料的序號,我們就假設分配的 offset 依次是 152/153/154。消費者從 kafka 去消費的時候,也是按照這個順序去消費。假如當消費者消費了 offset=153 的這條資料,剛準備去提交offset 到 zookeeper,此時消費者程序被重啟了。那麼此時消費過的資料 1/2 的offset 並沒有提交,kafka 也就不知道你已經消費了 offset=153 這條資料。那麼重啟之後,消費者會找kafka 說,嘿,哥兒們,你給我接著把上次我消費到的那個地方後面的資料繼續給我傳遞過來。由於之前的offset 沒有提交成功,那麼資料 1/2 會再次傳過來,如果此時消費者沒有去重的話,那麼就會導致重複消費。
其實還是得結合業務來思考,我這裡給幾個思路:
- 比如你拿個資料要寫庫,你先根據主鍵查一下,如果這資料都有了,你就別插入了,update 一下好吧。
- 比如你是寫 Redis,那沒問題了,反正每次都是 set,天然冪等性。
- 比如你不是上面兩個場景,那做的稍微複雜一點,你需要讓生產者傳送每條資料的時候,裡面加一個全域性唯一的 id,類似訂單 id 之類的東西,然後你這裡消費到了之後,先根據這個 id 去比如 Redis 裡查一下,之前消費過嗎?如果沒有消費過,你就處理,然後這個 id 寫Redis。如果消費過了,那你就別處理了,保證別重複處理相同的訊息即可。
- 比如基於資料庫的唯一鍵來保證重複資料不會重複插入多條。因為有唯一鍵約束了,重複資料插入只會報錯,不會導致資料庫中出現髒資料.