1. 程式人生 > 實用技巧 >高併發系統設計(十四):【訊息佇列】如何訊息不丟失?並且保證訊息僅僅被消費一次?

高併發系統設計(十四):【訊息佇列】如何訊息不丟失?並且保證訊息僅僅被消費一次?

https://www.cnblogs.com/wt645631686/p/13200665.html

訊息為什麼會丟失

訊息從被寫入到訊息佇列,到被消費者消費完成,這個鏈路上會有哪些地方存在丟失訊息的可能呢?其實,主要存在三個場景:

  • 訊息從生產者寫入到訊息佇列的過程。
  • 訊息在訊息佇列中的儲存場景。
  • 訊息被消費者消費的過程。

1.在訊息生產的過程中丟失訊息

在這個環節中主要有兩種情況。

首先,訊息的生產者一般是我們的業務伺服器,訊息佇列是獨立部署在單獨的伺服器上的。兩者之間的網路雖然是內網,但是也會存在抖動的可能,而一旦發生抖動,訊息就有可能因為網路的錯誤而丟失。

針對這種情況,我建議你採用的方案是訊息重傳:也就是當你發現傳送超時後你就將訊息重新發一次,但是你也不能無限制地重傳訊息。一般來說,如果不是訊息佇列發生故障,或者是到訊息佇列的網路斷開了,重試2~3次就可以了。

不過,這種方案可能會造成訊息的重複,從而導致在消費的時候會重複消費同樣的訊息。比方說,訊息生產時由於訊息佇列處理慢或者網路的抖動,導致雖然最終寫入訊息佇列成功,但在生產端卻超時了,生產者重傳這條訊息就會形成重複的訊息,那麼針對上面的例子,直觀顯示在你面前的就會是你收到了兩個現金紅包。

2.在訊息佇列中丟失訊息

拿Kafka舉例,訊息在Kafka中是儲存在本地磁碟上,而為了減少訊息儲存時對磁碟的隨機I/O,一般會將訊息先寫入到作業系統的Page Cache中,然後再找合適時機重新整理到磁碟上。

比如,Kafka可以配置當達到某一時間間隔,或者累積一定的訊息數量的時候再刷盤,也就是所說的非同步刷盤。

來看一個形象的比喻:假如你經營一個圖書館,讀者每還一本書你都要去把圖書歸位,不僅工作量大而且效率低下,但是如果你可以選擇每隔3小時,或者圖書達到一定數量的時候再把圖書歸位,這樣可以把同一型別的書一起歸位,節省了查詢圖書位置的時間,這樣就可以提高效率了。

不過,如果發生機器掉電或者機器異常重啟,那麼Page Cache中還沒有來得及刷盤的訊息就會丟失了。

那麼怎麼解決呢?

你可能會把刷盤的間隔設定很短,或者設定累積一條訊息就就刷盤,但這樣頻繁刷盤會對效能有比較大的影響,而且從經驗來看,出現機器宕機或者掉電的機率也不高,所以我不建議你這樣做。

如果你的電商系統對訊息丟失的容忍度很低,那麼你可以考慮以叢集方式部署Kafka服務,通過部署多個副本備份資料,保證訊息儘量不丟失。

那麼它是怎麼實現的呢?

Kafka叢集中有一個Leader負責訊息的寫入和消費,可以有多個Follower負責資料的備份。Follower中有一個特殊的集合叫做ISR(in-sync replicas),當Leader故障時,新選舉出來的Leader會從ISR中選擇,預設Leader的資料會非同步地複製給Follower,這樣在Leader發生掉電或者宕機時,Kafka會從Follower中消費訊息,減少訊息丟失的可能。

由於預設訊息是非同步地從Leader複製到Follower的,所以一旦Leader宕機,那些還沒有來得及複製到Follower的訊息還是會丟失。為了解決這個問題,Kafka為生產者提供一個選項叫做“acks”,當這個選項被設定為“all”時,生產者傳送的每一條訊息除了發給Leader外還會發給所有的ISR,並且必須得到Leader和所有ISR的確認後才被認為傳送成功。這樣,只有Leader和所有的ISR都掛了,訊息才會丟失

從上面這張圖來看,當設定“acks=all”時,需要同步執行1,3,4三個步驟,對於訊息生產的效能來說也是有比較大的影響的,所以你在實際應用中需要仔細地權衡考量。我給你的建議是:

1.如果你需要確保訊息一條都不能丟失,那麼建議不要開啟訊息佇列的同步刷盤,而是需要使用叢集的方式來解決,可以配置當所有ISR Follower都接收到訊息才返回成功。

2.如果對訊息的丟失有一定的容忍度,那麼建議不部署叢集,即使以叢集方式部署,也建議配置只發送給一個Follower就可以返回成功了。

3.我們的業務系統一般對於訊息的丟失有一定的容忍度,比如說以上面的紅包系統為例,如果紅包訊息丟失了,我們只要後續給沒有傳送紅包的使用者補發紅包就好了。

3.在消費的過程中存在訊息丟失的可能

我還是以Kafka為例來說明。一個消費者消費訊息的進度是記錄在訊息佇列叢集中的,而消費的過程分為三步:接收訊息、處理訊息、更新消費進度。

這裡面接收訊息和處理訊息的過程都可能會發生異常或者失敗,比如說,訊息接收時網路發生抖動,導致訊息並沒有被正確的接收到;處理訊息時可能發生一些業務的異常導致處理流程未執行完成,這時如果更新消費進度,那麼這條失敗的訊息就永遠不會被處理了,也可以認為是丟失了。

所以,在這裡你需要注意的是,一定要等到訊息接收和處理完成後才能更新消費進度,但是這也會造成訊息重複的問題,比方說某一條訊息在處理之後,消費者恰好宕機了,那麼因為沒有更新消費進度,所以當這個消費者重啟之後,還會重複地消費這條訊息。

如何保證訊息只被消費一次

只要保證即使消費到了重複的訊息,從消費的最終結果來看和只消費一次是等同的就好了,也就是保證在訊息的生產和消費的過程是“冪等”的。

1.什麼是冪等

如果我們消費一條訊息的時候,要給現有的庫存數量減1,那麼如果消費兩條相同的訊息就會給庫存數量減2,這就不是冪等的。而如果消費一條訊息後,處理邏輯是將庫存的數量設定為0,或者是如果當前庫存數量是10時則減1,這樣在消費多條訊息時,所得到的結果就是相同的,這就是冪等的。

說白了,你可以這麼理解“冪等”:一件事兒無論做多少次都和做一次產生的結果是一樣的,那麼這件事兒就具有冪等性。

2.在生產、消費過程中增加訊息冪等性的保證

在訊息生產過程中,在Kafka0.11版本和Pulsar中都支援“producer idempotency”的特性,翻譯過來就是生產過程的冪等性,這種特性保證訊息雖然可能在生產端產生重複,但是最終在訊息佇列儲存時只會儲存一份。

它的做法是給每一個生產者一個唯一的ID,並且為生產的每一條訊息賦予一個唯一ID,訊息佇列的服務端會儲存<生產者ID,最後一條訊息ID>的對映。當某一個生產者產生新的訊息時,訊息佇列服務端會比對訊息ID是否與儲存的最後一條ID一致,如果一致,就認為是重複的訊息,服務端會自動丟棄。

而在消費端,冪等性的保證會稍微複雜一些,你可以從通用層和業務層兩個層面來考慮。

在通用層面,你可以在訊息被生產的時候,使用發號器給它生成一個全域性唯一的訊息ID,訊息被處理之後,把這個ID儲存在資料庫中,在處理下一條訊息之前,先從資料庫裡面查詢這個全域性ID是否被消費過,如果被消費過就放棄消費。

不過這樣會有一個問題:如果訊息在處理之後,還沒有來得及寫入資料庫,消費者宕機了重啟之後發現數據庫中並沒有這條訊息,還是會重複執行兩次消費邏輯,這時你就需要引入事務機制,保證訊息處理和寫入資料庫必須同時成功或者同時失敗,但是這樣訊息處理的成本就更高了,所以,如果對於訊息重複沒有特別嚴格的要求,可以直接使用這種通用的方案,而不考慮引入事務。

在業務層面怎麼處理呢?這裡有很多種處理方式,其中有一種是增加樂觀鎖的方式。比如,你的訊息處理程式需要給一個人的賬號加錢,那麼你可以通過樂觀鎖的方式來解決。

具體的操作方式是這樣的:你給每個人的賬號資料中增加一個版本號的欄位,在生產訊息時先查詢這個賬戶的版本號,並且將版本號連同訊息一起傳送給訊息佇列。消費端在拿到訊息和版本號後,在執行更新賬戶金額SQL的時候帶上版本號,類似於執行:

update user set amount = amount + 20, version=version+1 where userId=1 and version=1;

在更新資料時給資料加了樂觀鎖,這樣在消費第一條訊息時,version值為1,SQL可以執行成功,並且同時把version值改為了2;在執行第二條相同的訊息時,由於version值不再是1,所以這條SQL不能執行成功,也就保證了訊息的冪等性。