1. 程式人生 > >使用訊息中介軟體時,如何保證訊息僅僅被消費一次?

使用訊息中介軟體時,如何保證訊息僅僅被消費一次?

訊息中介軟體使用廣泛,常用來削峰填谷、系統解耦、非同步處理。非同步處理可能是使用的最多的場景了,比如現在的技術部落格網站,都採用積分制,使用者發表一篇文章後,可以獲取想要的積分,為了提升系統的效能,給使用者加積分的操作可以非同步處理,並不需要放在同步流程中。 我們可以把使用者ID,需要增加的積分封裝成一條訊息投遞到訊息系統中,非同步處理加積分操作,由於這是發生在不同伺服器之間,訊息有可能投遞失敗、處理失敗等問題,從而導致使用者加積分失敗,還有一種可能是訊息重複投遞,那麼使用者就有可能重複加積分,不管出現那種情況,都是不正常的情況。 要避免上面的兩種情況,就需要我們儘量保證訊息不丟失和訊息只被消費一次,這篇文章拋開具體的訊息中介軟體,從訊息系統的通用層面上,談談如何避免這兩種情況。 # 1、保證訊息不丟失 一條訊息從生產到消費這條鏈路中,有三個地方可能會造成訊息丟失,分別如下: - 訊息從生產者寫入到訊息佇列的過程投遞失敗。 - 訊息在訊息佇列中,持久化失敗。 - 訊息被消費者消費的過程出現異常。 ## 1.1 在訊息生產的過程中投遞失敗 訊息生產者和訊息系統一般都是獨立部署在不同的伺服器上,兩臺伺服器之間要通訊就要通過網路來完成,網路是不穩定,可能會發生抖動,那麼資料就有可能丟失。網路發生抖動會有以下兩種情況。 ![在訊息生產的過程中丟失訊息](https://user-gold-cdn.xitu.io/2020/3/11/170c9e0cfc2619c1?w=1105&h=307&f=png&s=27458) 情景一:訊息在傳送給訊息系統的過程中發生網路抖動,資料直接丟失。 情景二:訊息已經到達訊息系統,但是在訊息系統給生產者伺服器返回資訊時,網路發生抖動,此時的資料不一定真正的丟失,很可能只是生產者認為資料丟失。 針對訊息在訊息生產時丟失,可以採取重投機制,當程式檢測到網路異常時,將訊息再次投遞到訊息系統。但是重新投遞在情景二情況下,可能造成資料重複,如何解決這個問題,在後面會提到。 ## 1.2 在訊息佇列中持久化失敗 訊息系統是可以對訊息進行持久化,一般都是將訊息儲存到本地磁碟中,當然也有少數訊息中介軟體支援將資料持久化到資料庫中,那麼訊息系統的效能可能就會下降。 如果你對 Redis 的持久化有一定的瞭解話,你會發現 Redis 在持久化資料時並不是每新增一條就立即存入到本地磁碟,而是會將資料先寫入到作業系統的 Page Cache 中,當滿足一定條件時,再將 Page Cache 中的資料刷入磁碟,因為這樣可以減少對磁碟的隨機 I/O 操作,我們知道隨機 I/O 是非常耗時的,這樣也提高了系統性能,訊息中介軟體也不例外,在持久化時也是採用這種方式。 **在某些極端情況下,可能會造成 Page Cache 中的資料丟失,比如突然停電或者機器異常重啟操作。要解決 Page Cache 中資料丟失問題,可以採用叢集部署的方式,來儘量保證資料不丟失。** ## 1.3 在消費的過程中存在訊息丟失 訊息在消費過程中也是會發生丟失的,而且在消費過程中丟失的概率要比前兩種情況大很多。一條訊息消費過程大概分成三步:消費者拉取訊息,消費者處理訊息,訊息系統更新消費進度。 ![圖片描述](https://user-gold-cdn.xitu.io/2020/3/11/170c9e0cfa9b3ad5?w=561&h=180&f=png&s=12404) 第一步在拉取訊息的時候可能發生網路抖動異常,第二步在處理訊息的時候可能發生一些業務異常,而導致流程並沒有走完,如果在第一步、第二步發生異常的情況下,通知訊息系統更新消費進度,那麼這條失敗的訊息就永遠不會在被處理了,自然就丟失了,其實我們的業務並沒有跑完。 **要避免訊息在消費時丟失的情況,可以在訊息接收和處理完成之後才更新消費進度,但是在極端的情況下,會出現訊息重複消費的問題,比如某一條訊息在處理完成之後,消費者宕機了,這時還沒有更新消費進度,消費者重啟後,這條訊息還是會被消費到。** # 2、如何保證訊息只被消費一次 訊息系統本身不能保證訊息僅被消費一次,因為消費本身可能重複、下游系統啟動拉取重複、失敗重試帶來的重複、補償邏輯導致的重複都有可能造重複訊息,**要保證訊息僅被消費一次可以利用等冪性來實現**。 **等冪是數學上的一個概念,就是多次執行同一個操作和執行一次操作,最終得到的結果是相同的。** 從等冪的概念上就可以看出來,就算訊息執行多次也不會對系統造成影響,那麼在使用訊息系統時如何保證等冪性呢?因為生產者和消費者都有可能產生重複訊息,所以要在生產者和消費者兩端都保證等冪性。 **保證生產者等冪性**,在生產訊息的時候,利用雪花演算法給訊息生成一個全域性 ID,在訊息系統中維護訊息已 ID 對映關係,如果在對映表中已經存在相同 ID,這丟棄這條訊息,雖然訊息被投遞了兩次,但是實際上就儲存了一條,避免了訊息重複問題。 生產者等冪性跟所選者的訊息中介軟體有關係,因為絕大數情況下訊息系統不需要我們自己實現,所以等冪性是不太好控制的,**消費者等冪性才是我們開發人員控制的重點方向**。 **在消費者端可以從通用層和業務層兩個方面來做等冪操作,取決於我們的業務要求。** 在通用層面中,利用好訊息生成是產生的全域性唯一ID,訊息被處理成功後,把這個全域性 ID 存入到資料中,在處理下一條訊息之前,先從資料庫中查詢這個全域性 ID 是否存在,如果已經存在,則直接放棄該訊息。 利用這個全域性唯一ID就實現了訊息等冪性,虛擬碼如下: ```java boolean isIDExisted = selectByID(ID); // 判斷ID是否存在 if(isIDExisted) { return; //存在則直接返回 } else { process(message); //不存在,則處理訊息 saveID(ID); //儲存ID } ``` 但是在極端情況下,這種方式還是會出問題,如果訊息在處理之後,還沒來得及儲存到資料庫,消費者就宕機重啟了,重啟之後還會再次獲取該訊息,執行時查詢該訊息並未被消費過,還是會執行兩次消費。可以引入資料庫事務來解決這個問題,但是會降低系統性能。如果對訊息重複消費沒有特別嚴格要求的話,直接使用這種沒有引入事務的通用方案就好了,畢竟這也是極小概率的事情。 在業務層面上,我們可選擇性就變多了,比如樂觀鎖、悲觀鎖、記憶體去重(https://github.com/RoaringBitmap/RoaringBitmap)等方法。 我們拿樂觀鎖來舉例,比如我們要給一個使用者加積分,因為加積分操作並不需要放在主業務中,所以就可以使用訊息系統來非同步通知,要使用樂觀鎖,就需要給積分表新增一個版本號欄位。並且在生產訊息的時候先查詢這個賬號的版本號並且連同訊息一起傳送到訊息系統中。 ![圖片描述](https://user-gold-cdn.xitu.io/2020/3/11/170c9e0cfaec5a72?w=866&h=493&f=png&s=29751) 消費者拿到訊息和版本號後,在執行更新積分操作的 SQL 時帶上版本號,類似於: ```sql update score set score = score + 20, version=version+1 where userId=1 and version=1; ``` 這條訊息消費成功後,version 就變成了 2,那麼如果有重複的 version=1 的訊息再次被消費者拉取到,SQL 語句並不會執行成功,從而保證了訊息的冪等性。 要保證訊息僅被消費一次,我們需要把重點放在消費者這一段,利用等冪性來保證訊息被消費一次。 今天站在訊息中介軟體的通用層面上,聊了聊如何保證資料不丟失和僅被消費一次,希望今天的文章對您的學習或者工作有所幫助,如果您認為文章有價值,歡迎點個贊,謝謝。 ### 最後 > 目前網際網路上很多大佬都有訊息中介軟體相關文章,如有雷同,請多多包涵了。原創不易,碼字不易,還希望大家多多支援。若文中有所錯誤之處,還望提出,謝謝。 ![網際網路平頭哥](https://user-gold-cdn.xitu.io/2020/2/1/16fffec0e10797f6?w=900&h=500&f=png&s=172745)