阿里RocketMQ如何解決訊息的順序和重複兩大硬傷
分散式訊息系統作為實現分散式系統可擴充套件、可伸縮性的關鍵元件,需要具有高吞吐量、高可用等特點。而談到訊息系統的設計,就回避不了兩個問題:
- 訊息的順序問題
- 訊息的重複問題
RocketMQ作為阿里開源的一款高效能、高吞吐量的訊息中介軟體,它是怎樣來解決這兩個問題的?RocketMQ有哪些關鍵特性?其實現原理是怎樣的?
順序訊息
訊息有序指的是可以按照訊息的傳送順序來消費。例如:一筆訂單產生了 3 條訊息,分別是訂單建立、訂單付款、訂單完成。消費時,要按照順序依次消費才有意義。與此同時多筆訂單之間又是可以並行消費的。 首先來看如下示例:
假如生產者產生了2條訊息:M1、M2,要保證這兩條訊息的順序,應該怎樣做?你腦中想到的可能是這樣:
你可能會採用這種方式保證訊息順序
假定M1傳送到S1,M2傳送到S2,如果要保證M1先於M2被消費,那麼需要M1到達消費端被消費後,通知S2,然後S2再將M2傳送到消費端。
這個模型存在的問題是,如果M1和M2分別傳送到兩臺Server上,就不能保證M1先達到MQ叢集,也不能保證M1被先消費。換個角度看,如果M2先於M1達到MQ叢集,甚至M2被消費後,M1才達到消費端,這時訊息也就亂序了,說明以上模型是不能保證訊息的順序的。 如何才能在MQ叢集保證訊息的順序?一種簡單的方式就是將M1、M2傳送到同一個Server上:
保證訊息順序,你改進後的方法
這樣可以保證M1先於M2到達MQServer(生產者等待M1傳送成功後再發送M2),根據先達到先被消費的原則,M1會先於M2被消費,這樣就保證了訊息的順序。
這個模型也僅僅是理論上可以保證訊息的順序,在實際場景中可能會遇到下面的問題:
網路延遲問題
只要將訊息從一臺伺服器發往另一臺伺服器,就會存在網路延遲問題。如上圖所示,如果傳送M1耗時大於傳送M2的耗時,那麼M2就仍將被先消費,仍然不能保證訊息的順序。即使M1和M2同時到達消費端,由於不清楚消費端1和消費端2的負載情況,仍然有可能出現M2先於M1被消費的情況。
那如何解決這個問題?將M1和M2發往同一個消費者,且傳送M1後,需要消費端響應成功後才能傳送M2。
聰明的你可能已經想到另外的問題:如果M1被髮送到消費端後,消費端1沒有響應,那是繼續傳送M2呢,還是重新發送M1?一般為了保證訊息一定被消費,肯定會選擇重發M1到另外一個消費端2,就如下圖所示。
保證訊息順序的正確姿勢
這樣的模型就嚴格保證訊息的順序,細心的你仍然會發現問題,消費端1沒有響應Server時有兩種情況,一種是M1確實沒有到達(資料在網路傳送中丟失),另外一種消費端已經消費M1且已經發送響應訊息,只是MQ Server端沒有收到。如果是第二種情況,重發M1,就會造成M1被重複消費。也就引入了我們要說的第二個問題,訊息重複問題,這個後文會詳細講解。
回過頭來看訊息順序問題,嚴格的順序訊息非常容易理解,也可以通過文中所描述的方式來簡單處理。總結起來,要實現嚴格的順序訊息,簡單且可行的辦法就是:
保證生產者 - MQServer - 消費者是一對一對一的關係
這樣的設計雖然簡單易行,但也會存在一些很嚴重的問題,比如:
- 並行度就會成為訊息系統的瓶頸(吞吐量不夠)
- 更多的異常處理,比如:只要消費端出現問題,就會導致整個處理流程阻塞,我們不得不花費更多的精力來解決阻塞的問題。
但我們的最終目標是要叢集的高容錯性和高吞吐量。這似乎是一對不可調和的矛盾,那麼阿里是如何解決的?
世界上解決一個計算機問題最簡單的方法:“恰好”不需要解決它!——沈詢
有些問題,看起來很重要,但實際上我們可以通過合理的設計或者將問題分解來規避。如果硬要把時間花在解決問題本身,實際上不僅效率低下,而且也是一種浪費。從這個角度來看訊息的順序問題,我們可以得出兩個結論:
- 不關注亂序的應用實際大量存在
- 佇列無序並不意味著訊息無序
所以從業務層面來保證訊息的順序而不僅僅是依賴於訊息系統,是不是我們應該尋求的一種更合理的方式?
最後我們從原始碼角度分析RocketMQ怎麼實現傳送順序訊息的。
RocketMQ通過輪詢所有佇列的方式來確定訊息被髮送到哪一個佇列(負載均衡策略)。比如下面的示例中,訂單號相同的訊息會被先後傳送到同一個佇列中:
在獲取到路由資訊以後,會根據MessageQueueSelector實現的演算法來選擇一個佇列,同一個OrderId獲取到的肯定是同一個佇列。
訊息重複
上面在解決訊息順序問題時,引入了一個新的問題,就是訊息重複。那麼RocketMQ是怎樣解決訊息重複的問題呢?還是“恰好”不解決。
造 成訊息重複的根本原因是:網路不可達。只要通過網路交換資料,就無法避免這個問題。所以解決這個問題的辦法就是繞過這個問題。那麼問題就變成了:如果消費端收到兩條一樣的訊息,應該怎樣處理?
- 消費端處理訊息的業務邏輯保持冪等性
- 保證每條訊息都有唯一編號且保證訊息處理成功與去重表的日誌同時出現
第1條很好理解,只要保持冪等性,不管來多少條重複訊息,最後處理的結果都一樣。第2條原理就是利用一張日誌表來記錄已經處理成功的訊息的ID,如果新到的訊息ID已經在日誌表中,那麼就不再處理這條訊息。
第1條解決方案,很明顯應該在消費端實現,不屬於訊息系統要實現的功能。第2條可以訊息系統實現,也可以業務端實現。正常情況下出現重複訊息的概率其實很小,如果由訊息系統來實現的話,肯定會對訊息系統的吞吐量和高可用有影響,所以最好還是由業務端自己處理訊息重複的問題,這也是RocketMQ不解決訊息重複的問題的原因。
RocketMQ 不保證訊息不重複,如果你的業務需要保證嚴格的不重複訊息,需要你自己在業務端去重。
事務訊息
RocketMQ除了支援普通訊息,順序訊息,另外還支援事務訊息。首先討論一下什麼是事務訊息以及支援事務訊息的必要性。我們以一個轉帳的場景為例來說明這個問題:Bob向Smith轉賬100塊。
在單機環境下,執行事務的情況,大概是下面這個樣子:
單機環境下轉賬事務示意圖
當用戶增長到一定程度,Bob和Smith的賬戶及餘額資訊已經不在同一臺伺服器上了,那麼上面的流程就變成了這樣:
叢集環境下轉賬事務示意圖
這時候你會發現,同樣是一個轉賬的業務,在叢集環境下,耗時居然成倍的增長,這顯然是不能夠接受的。那如何來規避這個問題?
大事務 = 小事務 + 非同步
將大事務拆分成多個小事務非同步執行。這樣基本上能夠將跨機事務的執行效率優化到與單機一致。轉賬的事務就可以分解成如下兩個小事務:
小事務+非同步訊息
圖中執行本地事務(Bob賬戶扣款)和傳送非同步訊息應該保證同時成功或者同時失敗,也就是扣款成功了,傳送訊息一定要成功,如果扣款失敗了,就不能再發送訊息。那問題是:我們是先扣款還是先發送訊息呢?
首先看下先發送訊息的情況,大致的示意圖如下:
事務訊息:先發送訊息
存在的問題是:如果訊息傳送成功,但是扣款失敗,消費端就會消費此訊息,進而向Smith賬戶加錢。
先發訊息不行,那就先扣款吧,大致的示意圖如下:
事務訊息-先扣款
存在的問題跟上面類似:如果扣款成功,傳送訊息失敗,就會出現Bob扣錢了,但是Smith賬戶未加錢。
可能大家會有很多的方法來解決這個問題,比如:直接將發訊息放到Bob扣款的事務中去,如果傳送失敗,丟擲異常,事務回滾。這樣的處理方式也符合“恰好”不需要解決的原則。
這裡需要說明一下: 如果使用Spring來管理事物的話,大可以將傳送訊息的邏輯放到本地事物中去,傳送訊息失敗丟擲異常,Spring捕捉到異常後就會回滾此事物,以此來保證本地事物與傳送訊息的原子性。
RocketMQ支援事務訊息,下面來看看RocketMQ是怎樣來實現的。
RocketMQ實現傳送事務訊息
RocketMQ第一階段傳送Prepared訊息時,會拿到訊息的地址,第二階段執行本地事物,第三階段通過第一階段拿到的地址去訪問訊息,並修改訊息的狀態。
細心的你可能又發現問題了,如果確認訊息傳送失敗了怎麼辦?RocketMQ會定期掃描訊息叢集中的事物訊息,如果發現了Prepared訊息,它會向訊息傳送端(生產者)確認,Bob的錢到底是減了還是沒減呢?如果減了是回滾還是繼續傳送確認訊息呢?RocketMQ會根據傳送端設定的策略來決定是回滾還是繼續傳送確認訊息。這樣就保證了訊息傳送與本地事務同時成功或同時失敗。
那我們來看下RocketMQ原始碼,是如何處理事務訊息的。客戶端傳送事務訊息的部分(完整程式碼請檢視:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):
接著檢視sendMessageInTransaction方法的原始碼,總共分為3個階段:傳送Prepared訊息、執行本地事務、傳送確認訊息。
endTransaction方法會將請求發往broker(mq server)去更新事務訊息的最終狀態:
- 根據sendResult找到Prepared訊息 ,sendResult包含事務訊息的ID
- 根據localTransaction更新訊息的最終狀態
如果endTransaction方法執行失敗,資料沒有傳送到broker,導致事務訊息的 狀態更新失敗,broker會有回查執行緒定時(預設1分鐘)掃描每個儲存事務狀態的表格檔案,如果是已經提交或者回滾的訊息直接跳過,如果是prepared狀態則會向Producer發起CheckTransaction請求,Producer會呼叫DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時回撥請求,而checkTransactionState會呼叫我們的事務設定的決斷方法來決定是回滾事務還是繼續執行,最後呼叫endTransactionOneway讓broker來更新訊息的最終狀態。
再回到轉賬的例子,如果Bob的賬戶的餘額已經減少,且訊息已經發送成功,Smith端開始消費這條訊息,這個時候就會出現消費失敗和消費超時兩個問題,解決超時問題的思路就是一直重試,直到消費端消費訊息成功,整個過程中有可能會出現訊息重複的問題,按照前面的思路解決即可。
消費事務訊息
這樣基本上可以解決消費端超時問題,但是如果消費失敗怎麼辦?阿里提供給我們的解決方法是:人工解決。大家可以考慮一下,按照事務的流程,因為某種原因Smith加款失敗,那麼需要回滾整個流程。如果訊息系統要實現這個回滾流程的話,系統複雜度將大大提升,且很容易出現Bug,估計出現Bug的概率會比消費失敗的概率大很多。這也是RocketMQ目前暫時沒有解決這個問題的原因,在設計實現訊息系統時,我們需要衡量是否值得花這麼大的代價來解決這樣一個出現概率非常小的問題,這也是大家在解決疑難問題時需要多多思考的地方。
Producer如何傳送訊息
Producer輪詢某topic下的所有佇列的方式來實現傳送方的負載均衡,如下圖所示:
producer傳送訊息負載均衡
首先分析一下RocketMQ的客戶端傳送訊息的原始碼:
在整個應用生命週期內,生產者需要呼叫一次start方法來初始化,初始化主要完成的任務有:
- 如果沒有指定namesrv地址,將會自動定址
- 啟動定時任務:更新namesrv地址、從namsrv更新topic路由資訊、清理已經掛掉的broker、向所有broker傳送心跳…
- 啟動負載均衡的服務
初始化完成後,開始傳送訊息,傳送訊息的主要程式碼如下:
程式碼中需要關注的兩個方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面說過在producer初始化時,會啟動定時任務獲取路由資訊並更新到本地快取,所以tryToFindTopicPublishInfo會首先從快取中獲取topic路由資訊,如果沒有獲取到,則會自己去namesrv獲取路由資訊。selectOneMessageQueue方法通過輪詢的方式,返回一個佇列,以達到負載均衡的目的。
如果Producer傳送訊息失敗,會自動重試,重試的策略:
- 重試次數 < retryTimesWhenSendFailed(可配置)
- 總的耗時(包含重試n次的耗時) < sendMsgTimeout(傳送訊息時傳入的引數)
- 同時滿足上面兩個條件後,Producer會選擇另外一個佇列傳送訊息