1. 程式人生 > >訊息佇列-核心概念三

訊息佇列-核心概念三

如何保證訊息不被重複消費(保證訊息的冪等性)

首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能會出現訊息重複消費的問題,正常。因為這問題通常不是 MQ 自己保證的,是由我們開發來保證的。挑一個 Kafka 來舉個例子,說說怎麼重複消費吧。

kafkas=實際有個offset概念,就是每個訊息寫進去,都有一個offset,代表訊息的序號,消費者consumer消費了訊息之後,會定期把自己消費的訊息offset提交一下,表示我已經消費過了,下次我要是重啟啥的,就從上次消費到的offset訊息開始繼續消費。

但凡總有意外,比如我們之前生產經常遇到的,就是你有時候重啟系統,看你怎麼重啟了,如果碰到點著急的,直接 kill 程序了,再重啟。這會導致消費者有些訊息處理了,但是沒來的及提交offset,尷尬了,再次重啟,少量訊息會再消費一次。

舉個栗子。

有這麼個場景。資料 1/2/3 依次進入 kafka,kafka 會給這三條資料每條分配一個 offset,代表這條資料的序號,我們就假設分配的 offset 依次是 152/153/154。消費者從 kafka 去消費的時候,也是按照這個順序去消費。假如當消費者消費了 offset=153的這條資料,剛準備去提交 offset 到 zookeeper,此時消費者程序被重啟了。那麼此時消費過的資料 1/2 的 offset 並沒有提交,kafka 也就不知道你已經消費了 offset=153 這條資料。那麼重啟之後,消費者會找 kafka 說,嘿,哥兒們,你給我接著把上次我消費到的那個地方後面的資料繼續給我傳遞過來。由於之前的 offset 沒有提交成功,那麼資料 1/2 會再次傳過來,如果此時消費者沒有去重的話,那麼就會導致重複消費。

如果消費者乾的事兒是拿一條資料就往資料庫裡寫一條,會導致說,你可能就把資料 1/2 在資料庫裡插入了 2 次,那麼資料就錯啦。

其實重複消費不可怕,可怕的是你沒考慮到重複消費之後,怎麼保證冪等性

舉個例子吧。假設你有個系統,消費一條訊息就往資料庫裡插入一條資料,要是你一個訊息重複兩次,你不就插入了兩條,這資料不就錯了?但是你要是消費到第二次的時候,自己判斷一下是否已經消費過了,若是就直接扔了,這樣不就保留了一條資料,從而保證了資料的正確性。

一條資料重複出現兩次,資料庫裡就只有一條資料,這就保證了系統的冪等性

冪等性,通俗點說,就一個數據,或者一個請求,給你重複來多次,你得確保對應的資料是不會改變的,不能出錯

所以第二個問題來了,怎麼保證訊息佇列消費的冪等性?

其實還得結合實際業務來思考,有以下幾點思路:

    比如,你那個資料寫庫,先根據主鍵查下,如果這資料都有,就別插入了,update以下就行。

    比如你是寫redis,那沒問題,反正每次都是set,天然冪等性。

    假如你不是上面這兩個場景,稍微做的複雜一點,你需要讓生產者傳送每一條資料的時候,裡面加一個全域性唯一id,類似訂單id之類的東西,然後你這裡消費到了之後,先根據id去比如redis裡面查下,之前消費過沒,如果沒有消費過,你就去處理,然後這個id寫redis,如果消費過了,你就別去處理,保證別去處理相同訊息即可。

    比如基於資料庫的唯一鍵來保證重複資料不會重複插入多條。因為有唯一鍵約束了,重複資料插入只會報錯,不會導致資料庫中出現髒資料。

當然,如何保證 MQ 的消費是冪等性的,需要結合具體的業務來