1. 程式人生 > >RocketMQ基礎概念剖析&原始碼解析

RocketMQ基礎概念剖析&原始碼解析

Topic

Topic是一類訊息的集合,是一種邏輯上的分割槽。為什麼說是邏輯分割槽呢?因為最終資料是儲存到Broker上的,而且為了滿足高可用,採用了分散式的儲存。

這和Kafka中的實現如出一轍,Kafka的Topic也是一種邏輯概念,每個Topic的資料會分成很多份,然後儲存在不同的Broker上,這個「份」叫Partition。而在RocketMQ中,Topic的資料也會分散式的儲存,這個「份」叫MessageQueue。

其分佈可以用下圖來表示。

這樣一來,如果某個Broker所在的機器意外宕機,而且剛好MessageQueue中的資料還沒有持久化到磁碟,那麼該Topic下的這部分訊息就會完全丟失。此時如果有備份的話,MQ就可以繼續對外提供服務。

為什麼還會出現沒有持久化到磁碟的情況呢?現在的OS當中,程式寫入資料到檔案之後,並不會立馬寫入到磁碟,因為磁碟I/O是非常耗時的操作,在計算機來看是非常慢的一種操作。所以寫入檔案的資料會先寫入到OS自己的快取中去,然後擇機非同步的將Buffer中的資料刷入磁碟。

通過多副本冗餘的機制,使得RocketMQ具有了高可用的特性。除此之外,分散式儲存能夠應對後期業務大量的資料儲存。如果不使用分散式進行儲存,那麼隨著後期業務發展,訊息量越來越大,單機是無論如何也滿足不了RocketMQ訊息的儲存需求的。如果不做處理,那麼一臺機器的磁碟總有被塞滿的時候,此時的系統就不具備可伸縮的特性,也無法滿足業務的使用要求了。

但是這裡的可伸縮,和微服務中的服務可伸縮還不太一樣。因為在微服務中,各個服務是無狀態的。而Broker是有狀態的,每個Broker上儲存的資料都不太一樣,因為Producer在傳送訊息的時候會通過指定的演算法,從Message Queue列表中選出一個MessageQueue傳送訊息。

如果不是很理解這個橫向擴充套件,那麼可以把它當成Redis的Cluster,通過一致性雜湊,選擇到Redis Cluster中的具體某個節點,然後將資料寫入Redis Master中去。如果此時想要擴容很方便,只需要往Redis Cluster中新增Master節點就好了。

所以,資料分散式的儲存本質上是一種資料分片的機制。在此基礎上,通過冗餘多副本,達成了高可用。

Broker

Broker可以理解為我們微服務中的一個服務的某個例項,因為微服務中我們的服務一般來說都會多例項部署,而RocketMQ也同理,多例項部署可以幫助系統扛住更多的流量,也從某種方面提高了系統的健壯性。

在RocketMQ4.5之前,它使用主從架構,每一個Master Broker都有一個自己的Slave Broker。

那RocketMQ的主從Broker是如何進行資料同步的呢?

Broker啟動的時候,會啟動一個定時任務,定期的從Master Broker同步全量的資料。

這塊可以先不用糾結,後面我們會通過原始碼來驗證這個主從同步邏輯。

上面提到了Broker會部署很多個例項,那麼既然多例項部署,那必然會存在一個問題,客戶端是如何得知自己是連線的哪個伺服器?如何得知對應的Broker的IP地址和埠?如果某個Broker突然掛了怎麼辦?

NameServer

這就需要NameServer了,NameServer是什麼?

這裡先拿Spring Cloud舉例子——Spring Cloud中服務啟動的時候會將自己註冊到Eureka註冊中心上。當服務例項啟動的時候,會從Eureka拉取全量的登錄檔,並且之後定期的從Eureka增量同步,並且每隔30秒傳送心跳到Eureka去續約。如果Eureka檢測到某個服務超過了90秒沒有傳送心跳,那麼就會該服務宕機,就會將其從登錄檔中移除。

RocketMQ中,NameServer充當的也是類似的角色。兩者從功能上也有一定的區別。

Broker在啟動的時候會向NameServer註冊自己,並且每隔30秒向NameServerv傳送心跳。如果某個Broker超過了120秒沒有傳送心跳,那麼就會認為該Broker宕機,就會將其從維護的資訊中移除。這塊後面也會從原始碼層面驗證。

當然NameServer不僅僅是儲存了各個Broker的IP地址和埠,還儲存了對應的Topic的路由資料。什麼是路由資料呢?那就是某個Topic下的哪個Message Queue在哪臺Broker上。

Producer

總體流程

接下來,我們來看看Producer傳送一條訊息到Broker的時候會做什麼事情,整體的流程如下。

檢查訊息合法性

整體來看,其實是個很簡單的操作,跟我們平時寫程式碼是一樣的,來請求了先校驗請求是否合法。Producer啟動這裡會去校驗當前Topic資料的合法性。

  • Topic名稱中是否包含了非法字元

  • Topic名稱長度是否超過了最大的長度限制,由常量TOPIC_MAX_LENGTH來決定,其預設值為127

  • 當前訊息體是否是NULL或者是空訊息

  • 當前訊息體是否超過了最大限制,由常量maxMessageSize決定,值為1024 * 1024 * 4,也就是4M。

都是些很常規的操作,和我們平時寫的checker都差不多。

獲取Topic的詳情

當通過了訊息的合法性校驗之後,就需要繼續往下走。此時的關注點就應該從訊息是否合法轉移到我要發訊息給誰。

此時就需要通過當前訊息所屬的Topic拿到Topic的詳細資料。

獲取Topic的方法原始碼在上面已經給出來了,首先會從記憶體中維護的一份Map中獲取資料。順帶一提,這裡的Map是ConcurrentHashMap,是執行緒安全的,和Golang中的Sync.Map類似。

當然,首次傳送的話,這個Map肯定是空的,此時會呼叫NameServer的介面,通過Topic去獲取詳情的Topic資料,此時會在上面的方法中將其加入到Map中去,這樣一來下次再往該Topic傳送訊息就能夠直接從記憶體中獲取。這裡就是簡單的實現的快取機制 。

從方法名稱來看,是通過Topic獲取路由資料。實際上該方法,通過呼叫NameServer提供的API,更新了兩部分資料,分別是:

  • Topic路由資訊
  • Topic下的Broker相關資訊

而這兩部分資料都來源於同一個結構體TopicRouteData。其結構如下。

通過原始碼可以看到,就包含了該Topic下所有Broker下的Message Queue相關的資料、所有Broker的地址資訊。

傳送的具體Queue

此時我們獲取到了需要傳送到的Broker詳情,包括地址和MessageQueue,那麼此時問題的關注點又該從「訊息傳送給誰」轉移到「訊息具體傳送到哪兒」。

什麼叫傳送到哪兒?

開篇提到過一個Topic下會被分為很多個MessageQueue,「傳送到哪兒」指的就是具體傳送到哪一個Message Queue中去。

Message Queue選擇機制

核心的選擇邏輯

還是先給出流程圖

核心邏輯,用大白話講就是將一個隨機數和Message Queue的容量取模。這個隨機數儲存在Thread Local中,首次計算的時候,會直接隨機一個數。

此後,都直接從ThreadLocal中取出該值,並且+1返回,拿到了MessageQueue的數量和隨機數兩個關鍵的引數之後,就會執行最終的計算邏輯。

接下來,我們來看看選擇Message Queue的方法SelectOneMessageQueue都做了什麼操作吧。

可以看到,主邏輯被變數sendLatencyFaultEnable分為了兩部分。

容錯機制下的選擇邏輯

該變量表意為傳送延遲故障。本質上是一種容錯的策略,在原有的MessageQueue選擇基礎上,再過濾掉不可用的Broker,對之前失敗的Broker,按一定的時間做退避。

可以看到,如果呼叫Broker資訊發生了異常,那麼就會呼叫updateFault這個方法,來更新Broker的Aviable情況。注意這個引數isolation的值為true。接下來我們從原始碼級別來驗證上面說的退避3000ms的事實。

可以看到,isolation值是true,則duration通過三元運算子計算出來結果為30000,也就是30秒。所以我們可以得出結論,如果傳送訊息丟擲了異常,那麼直接會將該Broker設定為30秒內不可用。

而如果只是傳送延遲較高,則會根據如下的map,根據延遲的具體時間,來判斷該設定多少時間的不可用。

例如,如果上次請求的latency超過550ms,就退避3000ms;超過1000,就退避60000;

正常情況下的選擇邏輯

而正常情況下,如果當前傳送故障延遲沒有啟用,則會走常規邏輯,同樣的會去for迴圈計算,迴圈中取到了MessageQueue之後會去判斷是否和上次選擇的MessageQueue屬於同一個Broker,如果是同一個Broker,則會重新選擇,直到選擇到不屬於同一個Broker的MessageQueue,或者直到迴圈結束。這也是為了將訊息均勻的分發儲存,防止資料傾斜。

傳送訊息

選到了具體的Message Queue之後就會開始執行傳送訊息的邏輯,就會呼叫底層Netty的介面給傳送出去,這塊暫時沒啥可看的。

Broker的啟動流程

主從同步

在上面提到過,RocketMQ有自己的主從同步,但是有兩個不同的版本,版本的分水嶺是在4.5版本。這兩個版本區別是什麼呢?

  • 4.5之前:有點類似於Redis中,我們手動的將某臺機器通過命令slave of 變成另一臺Redis的Slave節點,這樣一來就變成了一個較為原始的一主一從的架構。為什麼說原始呢?因為如果此時Master節點宕機,我們需要人肉的去做故障轉移。RocketMQ的主從架構也是這種情況。
  • 4.5之後:引入了Dleger,可以實現一主多從,並且實現自動的故障轉移。這就跟Redis後續推出了Sentinel是一樣的。Dleger也是類似的作用。

下圖是Broker啟動程式碼中的原始碼。

可以看到判斷了是否開啟了Dleger,預設是不開啟的。所以就會執行其中的邏輯。

剛好我們就看到了,裡面有Rocket主從同步資料的相關程式碼。

如果當前Broker節點的角色是Slave,則會啟動一個週期性的定時任務,定期(也就是10秒)去Master Broker同步全量的資料。同步的資料包括:

  • Topic的相關配置
  • Cosumer的消費偏移量
  • 延遲訊息的Offset
  • 訂閱組的相關資料和配置

註冊Broker

完成了主動同步定時任務的啟動之後,就會去呼叫registerBrokerAll去註冊Broker。可能這裡會有點疑問,我這裡是Broker啟動,只有當前一個Broker例項,那這個All是什麼意思呢?

All是指所有的NameServer,Broker啟動的時候會將自己註冊到每一個NameServer上去。為什麼不只註冊到一個NameServer就完事了呢?這樣一來還可以提高效率。歸根結底還是高可用的問題。

如果Broker只註冊到了一臺NameServer上,萬一這臺NameServer掛了呢?這個Broker對所有客戶端就都不可見了。實際上Broker還在正常的執行。

進到registerBrokerAll中去。

可以看到,這裡會判斷是否需要進行註冊。通過上面的截圖可以看到,此時forceRegister的值為true,而是否要註冊,決定權就交給了needRegister

為什麼需要判斷是否需要註冊呢?因為Broker一旦註冊到了NameServer之後,由於Producer不停的在寫入資料,Consumer也在不停的消費資料,Broker也可能因為故障導致某些Topic下的Message Queue等關鍵的路由資訊發生變動。

這樣一來,NameServer中的資料和Broker中的資料就會不一致。

如何判斷是否需要註冊

大致的思路是,Broker會從每一個NameServer中獲取到當前Broker的資料,並和當前Broker節點中的資料做對比。但凡有一臺NameServer資料和當前Broker不一致,都會進行註冊操作。

接下來,我們從原始碼層面驗證這個邏輯。關鍵的邏輯我在圖中也標註了出來。

可以看到, 就是通過對比Broker中的資料版本和NameServer中的資料版本來實現的。這個版本,註冊的時候會寫到註冊的資料中存入NameServer中。

這裡由於是有多個,所以RocketMQ用執行緒池來實現了多執行緒操作,並且用CountDownLatch來等待所有的返回結果。經典的用空間換時間,Golang裡面也有類似的操作,那就是sync.waitGroup。

關於任何一個數據不匹配,都會進行重新註冊的事實,我們也從原始碼層面來驗證一下。

可以看到,如果任何一臺NameServer的資料發生了Change,都會break,返回true。

這裡的結果列表使用的是CopyOnWriteList來實現的。

因為這裡是多執行緒去執行的判斷邏輯,而正常的列表不是執行緒安全的。CopyOnWriteArrayList之所以是執行緒安全的,這歸功於COW(Copy On Write),讀請求時共用同一個List,涉及到寫請求時,會複製出一個List,並在寫入資料的時候加入獨佔鎖。比起直接對所有操作加鎖,讀寫鎖的形式分離了讀、寫請求,使其互不影響,只對寫請求加鎖,降低了加鎖的次數、減少了加鎖的消耗,提升了整體操作的併發。

執行註冊邏輯

這塊就是構建資料,然後多執行緒併發的去傳送請求,用CopyOnWriteArrayList來儲存結果。不過,上面我們提到過,Broker註冊的時候,會把資料版本傳送到NameServer並且儲存起來,這塊我們可以看看傳送到NameServer的資料結構。

可以看到,Topic的資料分為了兩部分,一部分是核心的邏輯,另一部分是DataVersion,也就是我們剛剛一直提到的資料版本。

Broker如何儲存資料

剛剛在聊Producer最後提到的是,傳送訊息到Broker就完了。不知道大家有沒有想過Broker是如何儲存訊息的?

Commit log

先給出流程圖

然後給出結論,Producer傳送的訊息是儲存在一種叫commit log的檔案中的,Producer端每次寫入的訊息是不等長的,當該CommitLog檔案寫入滿1G,就會新建另一個新的CommitLog,繼續寫入。此次採取的是順序寫入。

那麼問題來了,Consumer來消費的時候,Broker是如何快速找到對應的訊息的呢?我們首先排除遍歷檔案查詢的方法, 因為RocketMQ是以高吞吐、高效能著稱的,肯定不可能採取這種對於很慢的操作。那RocketMQ是如何做的呢?

答案是ConsumerQueue

ConsumerQueue

ConsumerQueue是什麼?是檔案。引入的目的是什麼呢?提高消費的效能。

Broker在收到一條訊息的時候,寫入Commit Log的同時,還會將當前這條訊息在commit log中的offset、訊息的size和對應的Tag的Hash寫入到consumer queue檔案中去。

每個MessageQueue都會有對應的ConsumerQueue檔案儲存在磁碟上,每個ConsumerQueue檔案包含了30W條訊息,每條訊息的size大小為20位元組,包含了8位元組CommitLog的Offset、4位元組的訊息長度、8位元組的Tag的雜湊值。這樣一來,每個ConsumerQueue的檔案大小就約為5.72M。

當該ConsumerQueue檔案寫滿了之後,就會再新建一個ConsumerQueue檔案,繼續寫入。

所以,ConsumerQueue檔案可以看成是CommitLog檔案的索引。

負載均衡

什麼意思呢?假設我們總共有6個MessageQueue,然後此時分佈在了3臺Broker上,每個Broker上包含了兩個queue。此時Consumer有3臺,我們可以大致的認為每個Consumer負責2個MessageQueue的消費。但是這裡有一個原則,那就是一個MessageQueue只能被一臺Consumer消費,而一臺Consumer可以消費多個MessageQueue。

為什麼?道理很簡單,RocketMQ支援的順序消費,是指的分割槽順序性,也就是在單個MessageQueue中,訊息是具有順序性的,而如果多臺Consumer去消費同一個MessageQueue,就很難去保證順序消費了。

由於有很多個Consumer在消費多個MessageQueue,所以為了不出現資料傾斜,也為了資源的合理分配利用,在Producer傳送訊息的時候,需要儘可能的將訊息均勻的分發給多個MessageQueue。

同時,上面那種一個Consumer消費了2個MessageQueue的情況,萬一這臺Consumer掛了呢?這兩個MessageQueue不就沒人消費了?

以上兩種情況分別是Producer端的負載均衡、Consumer端的負載均衡。

Producer端負載均衡

關於Producer端上面的負載均衡,上面的流程圖已經給了出來,並且給出了原始碼的驗證。首先是容錯策略,會去避開一段時間有問題的Broker,並且加上如果選擇了上次的Broker,就會重新進行選擇。

Consumer端負載均衡

首先Consumer端的負責均衡可以由兩個物件觸發:

  • Broker
  • Consumer自身

Consumer也會向所有的Broker傳送心跳,將訊息的消費組名稱、訂閱關係集合、訊息的通訊模式和客戶端的ID等等。Broker收到了Consumer的心跳之後,會將其存在Broker維護的一個Manager中,名字叫ConsumerManager。當Broker監聽到了Consumer數量發生了變動,就會通知Consumer進行Rebalance。

但是如果Broker通知Consumer進行Rebalance的訊息丟了呢?這也就是為什麼需要第Consumer自身進行觸發的原因。Consumer會在啟動的時候啟動定時任務,週期性的執行rebalance操作。

預設是20秒執行一次。具體的程式碼如下。

具體流程

首先,Consumer的Rebalance會獲取到本地快取的Topic的全部資料,然後向Broker發起請求,拉取該Topic和ConsumerGroup下的所有的消費者資訊。此處的Broker資料來源就是Consumer之前的心跳傳送過去的資料。然後會對Topic中MessageQueue和消費者ID進行排序,然後用訊息佇列預設分配演算法來進行分配,這裡的預設分配策略是平均分配。

首先會均勻的按照類似分頁的思想,將MessageQueue分配給Consumer,如果分配的不均勻,則會依次的將剩下的MessageQueue按照排序的順序,從上往下的分配。所以在這裡Consumer 1被分配到了4個MessageQueue,而Consumer 2被分配到了3個MessageQueue。

Rebalance完了之後,會將結果和Consumer快取的資料做對比,移除不在ReBalance結果中的MessageQueue,將原本沒有的MessageQueue給新增到快取中。

觸發時機

  • Consumer啟動時 啟動之後會立馬進行Rebalance
  • Consumer執行中 執行中會監聽Broker傳送過來的Rebalance訊息,以及Consumer自身的定時任務觸發的Rebalance
  • Consumer停止執行 停止時沒有直接的呼叫Rebalance,而是會通知Broker自己下線了,然後Broker會通知其餘的Consumer進行Rebalance。

換一個角度來分析,其實就是兩個方面,一個是佇列資訊發生了變化,另一種是消費者發生了變化。

原始碼驗證

然後給出核心的程式碼驗證,獲取資料的邏輯如下

驗證了我們剛剛說的獲取了本地的Topic資料快取,和從Broker端拉取所有的ConsumerID。

接下來是驗證剛說的排序邏輯。

接下來是看判斷結果是否發生了變化的原始碼。

可以看到,Consumer通知Broker策略,其本質上就是傳送心跳,將更新後的資料通過心跳傳送給所有的Broker。

Consumer更多的細節

可能關於Consumer,我們使用的更多一點。例如我們知道我們可以設定叢集消費和廣播訊息,分別對應RocketMQ中的CLUSTERINGBROADCASTING**。

再比如我們知道,我們可以設定順序消費和併發消費等等,接下來就讓我們用原始碼來看看這些功能在RocketMQ中是怎麼實現的。

消費模型

在Consumer中,預設都是採用叢集消費,這塊在Consumer的程式碼中也有體現。

而消費模式的不同,會影響到管理offset的具體實現。

可以看到,當消費模型是廣播模式時,Offset的持久化管理會使用實現LocalFileOffsetStorage

當消費模式是叢集消費時,則會使用RemoteBrokerOffsetStore。

具體原因是什麼呢?首先我們得知道廣播模式和叢集模式的區別在哪兒:

  • 廣播模式下,一條訊息會被ConsumerGroup中的每一臺機器所消費
  • 叢集模式下,一條訊息只會被ConsumerGroup中的一臺機器消費

所以在廣播模式下,每個ConsumerGroup的消費進度都不一樣,所以需要由Consumer自身來管理Offset。而叢集模式下,同個ConsumerGroup下的消費進度其實是一樣的,所以可以交由Broker統一管理。

消費模式

消費模式則分為順序消費和併發消費,分別對應實現MessageListenerOrderly和MessageListenerConcurrently兩種方式。

不同的消費方式會採取不同的底層實現,配置完成之後就會呼叫start。

拉取訊息

接下來我們來看一個跟我們最最相關的問題,那就是我們平時消費的訊息到底是怎麼樣從Broker發到的Consumer。在靠近啟動Rebalance的地方,Consumer也開啟了一個定時拉取訊息的執行緒。

這個執行緒做了什麼事呢?它會不停的從一個維護在記憶體中的Queue中獲取一個在寫入的時候就構建好的PullRequest物件,呼叫具體實現去不停的拉取訊息了。

處理消費結果

在這裡是否開啟AutoCommit,所做的處理差不了很多,大家也都知道,唯一區別就在於是否自動的提交Offset。對於處理成功的邏輯也差不多,我們平時業務邏輯中可能也並不關心消費成功的訊息。我們更多關注的是如果消費失敗了,RocketMQ是怎麼處理的?

這是在AutoCommit下,如果消費失敗了的處理邏輯。會記錄一個失敗的TPS,然後這裡有一個非常關鍵的邏輯,那就是checkReconsumeTimes。

如果當前訊息的重試次數,如果大於了最大的重試消費次數,就會把消費發回給Broker。那最大重試次數是如何定義的。

如果值為-1,那麼最大次數就是MAX_VALUE,也就是2147483647。這裡有點奇怪啊,按照我們平常的認知,難道不是重試16次嗎?然後就看到了很騷的一句註釋。

-1 means 16 times,這程式碼確實有點,一言難盡。

然後,如果超過了最大的次數限