1. 程式人生 > 其它 >訊息佇列RocketMQ高階原理

訊息佇列RocketMQ高階原理

基本概念

1 訊息模型(Message Model)

RocketMQ主要由ProducerBrokerConsumer三部分組成,其中Producer負責生產訊息,Consumer負責消費訊息,Broker負責儲存訊息。Broker在實際部署過程中對應一臺伺服器,每個Broker可以儲存多個Topic的訊息,每個Topic的訊息也可以分片儲存於不同的BrokerMessage Queue用於儲存訊息的實體地址,每個Topic中的訊息地址儲存於多個Message Queue中(預設4個)。ConsumerGroup由多個Consumer例項構成。

2 訊息生產者(Producer)

負責生產訊息,一般由業務系統負責生產訊息。一個訊息生產者會把業務應用系統裡產生的訊息傳送到broker

伺服器。RocketMQ提供多種傳送方式,同步傳送、非同步傳送、順序傳送、單向傳送。同步和非同步方式均需要Broker返回確認資訊,單向傳送不需要。

3 訊息消費者(Consumer)

負責消費訊息,一般是後臺系統負責非同步消費。一個訊息消費者會從Broker伺服器拉取訊息、並將其提供給應用程式。從使用者應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。

4 主題(Topic)

表示一類訊息的集合,每個主題包含若干條訊息,每條訊息只能屬於一個主題,是RocketMQ進行訊息訂閱的基本單位。

5 代理伺服器(Broker Server)

訊息中轉角色,負責儲存訊息、轉發訊息。代理伺服器在RocketMQ

系統中負責接收從生產者傳送來的訊息並存儲、同時為消費者的拉取請求作準備。代理伺服器也儲存訊息相關的元資料,包括消費者組、消費進度偏移和主題和佇列訊息等。

BrokerServerBroker主要負責訊息的儲存、投遞和查詢以及服務高可用保證,為了實現這些功能,Broker包含了以下幾個重要子模組。

  1. Remoting Module:整個Broker的實體,負責處理來自clients端的請求。
  2. Client Manager:負責管理客戶端(Producer/Consumer)和維護ConsumerTopic訂閱資訊
  3. Store Service:提供方便簡單的API介面處理訊息儲存到物理硬碟和查詢功能。
  4. HA Service:高可用服務,提供Master BrokerSlave Broker之間的資料同步功能。
  5. Index Service:根據特定的Message key對投遞到Broker的訊息進行索引服務,以提供訊息的快速查詢。

6 名字服務(Name Server)

名稱服務充當路由訊息的提供者。生產者或消費者能夠通過名字服務查詢各主題相應的Broker IP列表。多個Namesrv例項組成叢集,但相互獨立,沒有資訊交換。

7 拉取式消費(Pull Consumer)

Consumer消費的一種型別,應用通常主動呼叫Consumer的拉訊息方法從Broker伺服器拉訊息、主動權由應用控制。一旦獲取了批量訊息,應用就會啟動消費過程。

8 推動式消費(Push Consumer)

Consumer消費的一種型別,該模式下Broker收到資料後會主動推送給消費端,該消費模式一般實時性較高。

9 生產者組(Producer Group)

同一類Producer的集合,這類Producer傳送同一類訊息且傳送邏輯一致。如果傳送的是事務訊息且原始生產者在傳送之後崩潰,則Broker伺服器會聯絡同一生產者組的其他生產者例項以提交或回溯消費。

10 消費者組(Consumer Group)

同一類Consumer的集合,這類Consumer通常消費同一類訊息且消費邏輯一致。消費者組使得在訊息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者例項必須訂閱完全相同的TopicRocketMQ支援兩種訊息模式:叢集消費(Clustering)和廣播消費(Broadcasting)。

11 叢集消費(Clustering)

叢集消費模式下,相同Consumer Group的每個Consumer例項平均分攤訊息。

12 廣播消費(Broadcasting)

廣播消費模式下,相同Consumer Group的每個Consumer例項都接收全量的訊息。

13 普通順序訊息(Normal Ordered Message)

普通順序消費模式下,消費者通過同一個訊息佇列(Topic分割槽,稱作Message Queue)收到的訊息是有順序的,不同訊息佇列收到的訊息則可能是無順序的。

14 嚴格順序訊息(Strictly Ordered Message)

嚴格順序訊息模式下,消費者收到的所有訊息均是有順序的。

15 訊息(Message)

訊息系統所傳輸資訊的物理載體,生產和消費資料的最小單位,每條訊息必須屬於一個主題。RocketMQ中每個訊息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了通過Message IDKey查詢訊息的功能。

16 標籤(Tag)

為訊息設定的標誌,用於同一主題下區分不同型別的訊息。來自同一業務單元的訊息,可以根據不同業務目的在同一主題下設定不同標籤。標籤能夠有效地保持程式碼的清晰度和連貫性,並優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴充套件性。

二、特性

1 訂閱與釋出

訊息的釋出是指某個生產者向某個topic傳送訊息;訊息的訂閱是指某個消費者關注了某個topic中帶有某些tag的訊息,進而從該topic消費資料。

2 訊息順序

訊息有序指的是一類訊息消費時,能按照發送的順序來消費。例如:一個訂單產生了三條訊息分別是訂單建立、訂單付款、訂單完成。消費時要按照這個順序消費才能有意義,但是同時訂單之間是可以並行消費的。RocketMQ可以嚴格的保證訊息有序。

順序訊息分為全域性順序訊息與分割槽順序訊息,全域性順序是指某個Topic下的所有訊息都要保證順序;部分順序訊息只要保證每一組訊息被順序消費即可。

  • 全域性順序
    對於指定的一個Topic,所有訊息按照嚴格的先入先出(FIFO)的順序進行釋出和消費。
    適用場景:效能要求不高,所有的訊息嚴格按照FIFO原則進行訊息釋出和消費的場景
  • 分割槽順序
    對於指定的一個Topic,所有訊息根據sharding key進行區塊分割槽。同一個分割槽內的訊息按照嚴格的FIFO順序進行釋出和消費。Sharding key是順序訊息中用來區分不同分割槽的關鍵欄位,和普通訊息的Key是完全不同的概念。
    適用場景:效能要求高,以sharding key作為分割槽欄位,在同一個區塊中嚴格的按照FIFO原則進行訊息釋出和消費的場景。

3 訊息過濾

RocketMQ的消費者可以根據Tag進行訊息過濾,也支援自定義屬性過濾。訊息過濾目前是在Broker端實現的,優點是減少了對於Consumer無用訊息的網路傳輸,缺點是增加了Broker的負擔、而且實現相對複雜。

4 訊息可靠性

RocketMQ支援訊息的高可靠,影響訊息可靠性的幾種情況:

  1. Broker非正常關閉
  2. Broker異常Crash
  3. OS Crash
  4. 機器掉電,但是能立即恢復供電情況
  5. 機器無法開機(可能是cpu、主機板、記憶體等關鍵裝置損壞)
  6. 磁碟裝置損壞

1)、2)、3)、4) 四種情況都屬於硬體資源可立即恢復情況,RocketMQ在這四種情況下能保證訊息不丟,或者丟失少量資料(依賴刷盤方式是同步還是非同步)。

5)、6)屬於單點故障,且無法恢復,一旦發生,在此單點上的訊息全部丟失。RocketMQ在這兩種情況下,通過非同步複製,可保證99%的訊息不丟,但是仍然會有極少量的訊息可能丟失。通過同步雙寫技術可以完全避免單點,同步雙寫勢必會影響效能,適合對訊息可靠性要求極高的場合,例如與Money相關的應用。注:RocketMQ3.0版本開始支援同步雙寫。

5 至少一次

至少一次(At least Once)指每個訊息必須投遞一次。ConsumerPull訊息到本地,消費完成後,才向伺服器返回ack,如果沒有消費一定不會ack訊息,所以RocketMQ可以很好的支援此特性。

6 回溯消費

回溯消費是指Consumer已經消費成功的訊息,由於業務上需求需要重新消費,要支援此功能,Broker在向Consumer投遞成功訊息後,訊息仍然需要保留。並且重新消費一般是按照時間維度,例如由於Consumer系統故障,恢復後需要重新消費1小時前的資料,那麼Broker要提供一種機制,可以按照時間維度來回退消費進度。RocketMQ支援按照時間回溯消費,時間維度精確到毫秒。

7 事務訊息

事務訊息(Transactional Message)是指應用本地事務和傳送訊息操作可以被定義到全域性事務中,要麼同時成功,要麼同時失敗。RocketMQ的事務訊息提供類似X/Open XA的分佈事務功能,通過事務訊息能達到分散式事務的最終一致。

8 定時訊息

定時訊息(延遲佇列)是指訊息傳送到broker後,不會立即被消費,等待特定時間投遞給真正的topic
broker有配置項messageDelayLevel,預設值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18level。可以配置自定義messageDelayLevel。注意,messageDelayLevelbroker的屬性,不屬於某個topic。發訊息時,設定delayLevel等級即可:msg.setDelayLevel(level)level有以下三種情況:

  • level == 0,訊息為非延遲訊息
  • 1 <= level <= maxLevel,訊息延遲特定時間,例如level ==1,延遲1s
  • level > maxLevel,則level == maxLevel,例如level ==20,延遲2h

定時訊息會暫存在名為SCHEDULE_TOPIC_XXXXtopic中,並根據delayTimeLevel存入特定的queuequeueId = delayTimeLevel – 1,即一個queue只存相同延遲的訊息,保證具有相同傳送延遲的訊息能夠順序消費。broker會排程地消費SCHEDULE_TOPIC_XXXX,將訊息寫入真實的topic

需要注意的是,定時訊息會在第一次寫入和排程寫入真實topic時都會計數,因此傳送數量、tps都會變高。

9 訊息重試

Consumer消費訊息失敗後,要提供一種重試機制,令訊息再消費一次。Consumer消費訊息失敗通常可以認為有以下幾種情況:

  • 由於訊息本身的原因,例如反序列化失敗,訊息資料本身無法處理(例如話費充值,當前訊息的手機號被登出,無法充值)等。這種錯誤通常需要跳過這條訊息,再消費其它訊息,而這條失敗的訊息即使立刻重試消費,99%也不成功,所以最好提供一種定時重試機制,即過10秒後再重試。
  • 由於依賴的下游應用服務不可用,例如db連線不可用,外系統網路不可達等。遇到這種錯誤,即使跳過當前失敗的訊息,消費其他訊息同樣也會報錯。這種情況建議應用sleep 30s,再消費下一條訊息,這樣可以減輕Broker重試訊息的壓力。

RocketMQ會為每個消費組都設定一個Topic名稱為“%RETRY%+consumerGroup”的重試佇列(這裡需要注意的是,這個Topic的重試佇列是針對消費組,而不是針對每個Topic設定的),用於暫時儲存因為各種異常而導致Consumer端無法消費的訊息。考慮到異常恢復起來需要一些時間,會為重試佇列設定多個重試級別,每個重試級別都有與之對應的重新投遞延時,重試次數越多投遞延時就越大。RocketMQ對於重試訊息的處理是先儲存至Topic名稱為“SCHEDULE_TOPIC_XXXX”的延遲佇列中,後臺定時任務按照對應的時間進行Delay後重新儲存至“%RETRY%+consumerGroup”的重試佇列中。

10 訊息重投

生產者在傳送訊息時,同步訊息失敗會重投,非同步訊息有重試,oneway沒有任何保證。訊息重投保證訊息儘可能傳送成功、不丟失,但可能會造成訊息重複,訊息重複在RocketMQ中是無法避免的問題。訊息重複在一般情況下不會發生,當出現訊息量大、網路抖動,訊息重複就會是大概率事件。另外,生產者主動重發、consumer負載變化也會導致重複訊息。如下方法可以設定訊息重試策略:

  • retryTimesWhenSendFailed:同步傳送失敗重投次數,預設為2,因此生產者會最多嘗試傳送retryTimesWhenSendFailed + 1次。不會選擇上次失敗的broker,嘗試向其他broker傳送,最大程度保證訊息不丟。超過重投次數,丟擲異常,由客戶端保證訊息不丟。當出現RemotingExceptionMQClientException和部分MQBrokerException時會重投。
  • retryTimesWhenSendAsyncFailed:非同步傳送失敗重試次數,非同步重試不會選擇其他broker,僅在同一個broker上做重試,不保證訊息不丟。
  • retryAnotherBrokerWhenNotStoreOK:訊息刷盤(主或備)超時或slave不可用(返回狀態非SEND_OK),是否嘗試傳送到其他broker,預設false。十分重要訊息可以開啟。

11 流量控制

生產者流控,因為broker處理能力達到瓶頸;消費者流控,因為消費能力達到瓶頸。

生產者流控:

  • commitLog檔案被鎖時間超過osPageCacheBusyTimeOutMills時,引數預設為1000ms,返回流控。
  • 如果開啟transientStorePoolEnable == true,且broker為非同步刷盤的主機,且transientStorePool中資源不足,拒絕當前send請求,返回流控。
  • broker每隔10ms檢查send請求佇列頭部請求的等待時間,如果超過waitTimeMillsInSendQueue,預設200ms,拒絕當前send請求,返回流控。
  • broker通過拒絕send請求方式實現流量控制。

注意,生產者流控,不會嘗試訊息重投。

消費者流控:

  • 消費者本地快取訊息數超過pullThresholdForQueue時,預設1000
  • 消費者本地快取訊息大小超過pullThresholdSizeForQueue時,預設100MB
  • 消費者本地快取訊息跨度超過consumeConcurrentlyMaxSpan時,預設2000

消費者流控的結果是降低拉取頻率。

12 死信佇列

死信佇列用於處理無法被正常消費的訊息。當一條訊息初次消費失敗,訊息佇列會自動進行訊息重試;達到最大重試次數後,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該訊息,此時,訊息佇列不會立刻將訊息丟棄,而是將其傳送到該消費者對應的特殊佇列中。

RocketMQ將這種正常情況下無法被消費的訊息稱為死信訊息(Dead-Letter Message),將儲存死信訊息的特殊佇列稱為死信佇列(Dead-Letter Queue)。在RocketMQ中,可以通過使用console控制檯對死信佇列中的訊息進行重發來使得消費者例項再次進行消費。

三、訊息儲存

訊息儲存是RocketMQ中最為複雜和最為重要的一部分,本節將分別從RocketMQ訊息儲存整體架構、PageCacheMmap記憶體對映以及RocketMQ中兩種不同的刷盤方式三方面來分別展開敘述。RocketMQ底層優化:順序寫、非同步刷、零拷貝

3.1 訊息儲存整體架構

訊息儲存架構圖中主要有下面三個跟訊息儲存相關的檔案構成。

(1) CommitLog:訊息主體以及元資料的儲存主體,儲存Producer端寫入的訊息主體內容,訊息內容不是定長的。單個檔案大小預設1G,檔名長度為20位,左邊補零,剩餘為起始偏移量,比如00000000000000000000代表了第一個檔案,起始偏移量為0,檔案大小為1G=1073741824;當第一個檔案寫滿了,第二個檔案為00000000001073741824,起始偏移量為1073741824,以此類推。訊息主要是順序寫入日誌檔案,當檔案滿了,寫入下一個檔案;

(2) ConsumeQueue:訊息消費佇列,引入的目的主要是提高訊息消費的效能,由於RocketMQ是基於主題topic的訂閱模式,訊息消費是針對主題進行的,如果要遍歷commitlog檔案中根據topic檢索訊息是非常低效的。Consumer即可根據ConsumeQueue來查詢待消費的訊息。其中,ConsumeQueue(邏輯消費佇列)作為消費訊息的索引,儲存了指定Topic下的佇列訊息在CommitLog中的起始物理偏移量offset,訊息大小size和訊息TagHashCode值。consumequeue檔案可以看成是基於topiccommitlog索引檔案,故consumequeue資料夾的組織方式如下:topic/queue/file三層組織結構,具體儲存路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue檔案採取定長設計,每一個條目共20個位元組,分別為8位元組的commitlog物理偏移量、4位元組的訊息長度、8位元組tag hashcode,單個檔案由30W個條目組成,可以像陣列一樣隨機訪問每一個條目,每個ConsumeQueue檔案大小約5.72M

(3) IndexFileIndexFile(索引檔案)提供了一種可以通過key或時間區間來查詢訊息的方法。Index檔案的儲存位置是:$HOME\store\index\${fileName},檔名fileName是以建立時的時間戳命名的,固定的單個IndexFile檔案大小約為400M,一個IndexFile可以儲存2000W個索引,IndexFile的底層儲存設計為在檔案系統中實現HashMap結構,故rocketmq的索引檔案其底層實現為hash索引。

在上面的RocketMQ的訊息儲存整體架構圖中可以看出,RocketMQ採用的是混合型的儲存結構,即為Broker單個例項下所有的佇列共用一個日誌資料檔案(即為CommitLog)來儲存。RocketMQ的混合型儲存結構(多個Topic的訊息實體內容都儲存於一個CommitLog中)針對ProducerConsumer分別採用了資料和索引部分相分離的儲存結構,Producer傳送訊息至Broker端,然後Broker端使用同步或者非同步的方式對訊息刷盤持久化,儲存至CommitLog中。只要訊息被刷盤持久化至磁碟檔案CommitLog中,那麼Producer傳送的訊息就不會丟失。正因為如此,Consumer也就肯定有機會去消費這條訊息。當無法拉取到訊息後,可以等下一次訊息拉取,同時服務端也支援長輪詢模式,如果一個訊息拉取請求未拉取到訊息,Broker允許等待30s的時間,只要這段時間內有新訊息到達,將直接返回給消費端。這裡,RocketMQ的具體做法是,使用Broker端的後臺服務執行緒—ReputMessageService不停地分發請求並非同步構建ConsumeQueue(邏輯消費佇列)和IndexFile(索引檔案)資料。

3.2 頁快取與記憶體對映

頁快取(PageCache)是OS對檔案的快取,用於加速對檔案的讀寫。一般來說,程式對檔案進行順序讀寫的速度幾乎接近於記憶體的讀寫速度,主要原因就是由於OS使用PageCache機制對讀寫訪問操作進行了效能優化,將一部分的記憶體用作PageCache。對於資料的寫入,OS會先寫入至Cache內,隨後通過非同步的方式由pdflush核心執行緒將Cache內的資料刷盤至物理磁碟上。對於資料的讀取,如果一次讀取檔案時出現未命中PageCache的情況,OS從物理磁碟上訪問讀取檔案的同時,會順序對其他相鄰塊的資料檔案進行預讀取。

RocketMQ中,ConsumeQueue邏輯消費佇列儲存的資料較少,並且是順序讀取,在page cache機制的預讀取作用下,Consume Queue檔案的讀效能幾乎接近讀記憶體,即使在有訊息堆積情況下也不會影響效能。而對於CommitLog訊息儲存的日誌資料檔案來說,讀取訊息內容時候會產生較多的隨機訪問讀取,嚴重影響效能。如果選擇合適的系統IO排程演算法,比如設定排程演算法為“Deadline”(此時塊儲存採用SSD的話),隨機讀的效能也會有所提升。

另外,RocketMQ主要通過MappedByteBuffer對檔案進行讀寫操作。其中,利用了NIO中的FileChannel模型將磁碟上的物理檔案直接對映到使用者態的記憶體地址中(這種Mmap的方式減少了傳統IO將磁碟檔案資料在作業系統核心地址空間的緩衝區和使用者應用程式地址空間的緩衝區之間來回進行拷貝的效能開銷),將對檔案的操作轉化為直接對記憶體地址進行操作,從而極大地提高了檔案的讀寫效率(正因為需要使用記憶體對映機制,故RocketMQ的檔案儲存都使用定長結構來儲存,方便一次將整個檔案對映至記憶體)。

3.3 訊息刷盤

(1) 同步刷盤:如上圖所示,只有在訊息真正持久化至磁碟後RocketMQBroker端才會真正返回給Producer端一個成功的ACK響應。同步刷盤對MQ訊息可靠性來說是一種不錯的保障,但是效能上會有較大影響,一般適用於金融業務應用該模式較多。

(2) 非同步刷盤:能夠充分利用OSPageCache的優勢,只要訊息寫入PageCache即可將成功的ACK返回給Producer端。訊息刷盤採用後臺非同步執行緒提交的方式進行,降低了讀寫延遲,提高了MQ的效能和吞吐量。

四、通訊機制

RocketMQ訊息佇列叢集主要包括NameServerBroker(Master/Slave)、ProducerConsumer4個角色,基本通訊流程如下:

(1) Broker啟動後需要完成一次將自己註冊至NameServer的操作;隨後每隔30s時間定時向NameServer上報Topic路由資訊。

(2) 訊息生產者Producer作為客戶端傳送訊息時候,需要根據訊息的Topic從本地快取的TopicPublishInfoTable獲取路由資訊。如果沒有則更新路由資訊會從NameServer上重新拉取,同時Producer會預設每隔30sNameServer拉取一次路由資訊。

(3) 訊息生產者Producer根據2)中獲取的路由資訊選擇一個佇列(MessageQueue)進行訊息傳送;Broker作為訊息的接收者接收訊息並落盤儲存。

(4) 訊息消費者Consumer根據2)中獲取的路由資訊,並再完成客戶端的負載均衡後,選擇其中的某一個或者某幾個訊息佇列來拉取訊息並進行消費。

從上面1)~3)中可以看出在訊息生產者,BrokerNameServer之間都會發生通訊(這裡只說了MQ的部分通訊),因此如何設計一個良好的網路通訊模組在MQ中至關重要,它將決定RocketMQ叢集整體的訊息傳輸能力與最終的效能。

rocketmq-remoting模組是RocketMQ訊息佇列中負責網路通訊的模組,它幾乎被其他所有需要網路通訊的模組(諸如rocketmq-clientrocketmq-brokerrocketmq-namesrv)所依賴和引用。為了實現客戶端與伺服器之間高效的資料請求與接收,RocketMQ訊息佇列自定義了通訊協議並在Netty的基礎之上擴充套件了通訊模組。

4.1 Remoting通訊類結構

4.2 協議設計與編解碼

ClientServer之間完成一次訊息傳送時,需要對傳送的訊息進行一個協議約定,因此就有必要自定義RocketMQ的訊息協議。同時,為了高效地在網路中傳輸訊息和對收到的訊息讀取,就需要對訊息進行編解碼。在RocketMQ中,RemotingCommand這個類在訊息傳輸過程中對所有資料內容的封裝,不但包含了所有的資料結構,還包含了編碼解碼操作。

Header欄位 型別 Request說明 Response說明
code int 請求操作碼,應答方根據不同的請求碼進行不同的業務處理 應答響應碼。0表示成功,非0則表示各種錯誤
language LanguageCode 請求方實現的語言 應答方實現的語言
version int 請求方程式的版本 應答方程式的版本
opaque int 相當於requestId,在同一個連線上的不同請求標識碼,與響應訊息中的相對應 應答不做修改直接返回
flag int 區分是普通RPC還是onewayRPC的標誌 區分是普通RPC還是onewayRPC的標誌
remark String 傳輸自定義文字資訊 傳輸自定義文字資訊
extFields HashMap 請求自定義擴充套件資訊 響應自定義擴充套件資訊

可見傳輸內容主要可以分為以下4部分:

(1) 訊息長度:總長度,四個位元組儲存,佔用一個int型別;
(2) 序列化型別&訊息頭長度:同樣佔用一個int型別,第一個位元組表示序列化型別,後面三個位元組表示訊息頭長度;
(3) 訊息頭資料:經過序列化後的訊息頭資料;
(4) 訊息主體資料:訊息主體的二進位制位元組資料內容;

4.3 訊息的通訊方式和流程

RocketMQ訊息佇列中支援通訊的方式主要有同步(sync)、非同步(async)、單向(oneway)三種。其中“單向”通訊模式相對簡單,一般用在傳送心跳包場景下,無需關注其Response。這裡,主要介紹RocketMQ的非同步通訊流程。

4.4 Reactor多執行緒設計

RocketMQRPC通訊採用Netty元件作為底層通訊庫,同樣也遵循了Reactor多執行緒模型,同時又在這之上做了一些擴充套件和優化。

上面的框圖中可以大致瞭解RocketMQNettyRemotingServerReactor多執行緒模型。一個Reactor主執行緒(eventLoopGroupBoss,即為上面的1)負責監聽TCP網路連線請求,建立好連線,建立SocketChannel,並註冊到selector上。RocketMQ的原始碼中會自動根據OS的型別選擇NIOEpoll,也可以通過引數配置),然後監聽真正的網路資料。拿到網路資料後,再丟給Worker執行緒池(eventLoopGroupSelector,即為上面的“N”,原始碼中預設設定為3),在真正執行業務邏輯之前需要進行SSL驗證、編解碼、空閒檢查、網路連線管理,這些工作交給defaultEventExecutorGroup(即為上面的“M1”,原始碼中預設設定為8)去做。而處理業務操作放在業務執行緒池中執行,根據RomotingCommand的業務請求碼codeprocessorTable這個本地快取變數中找到對應的processor,然後封裝成task任務後,提交給對應的業務processor處理執行緒池來執行(sendMessageExecutor,以傳送訊息為例,即為上面的“M2”)。從入口到業務邏輯的幾個步驟中執行緒池一直再增加,這跟每一步邏輯複雜性相關,越複雜,需要的併發通道越寬。

執行緒數 執行緒名 執行緒具體說明
1 NettyBoss_%d Reactor 主執行緒
N NettyServerEPOLLSelector_%d_%d Reactor 執行緒池
M1 NettyServerCodecThread_%d Worker執行緒池
M2 RemotingExecutorThread_%d 業務processor處理執行緒池

五、負載均衡

RocketMQ中的負載均衡都在Client端完成,具體來說的話,主要可以分為Producer端傳送訊息時候的負載均衡和Consumer端訂閱訊息的負載均衡。

5.1 Producer的負載均衡

Producer端在傳送訊息的時候,會先根據Topic找到指定的TopicPublishInfo,在獲取了TopicPublishInfo路由資訊後,RocketMQ的客戶端在預設方式下selectOneMessageQueue()方法會從TopicPublishInfo中的messageQueueList中選擇一個佇列(MessageQueue)進行傳送訊息。具體的容錯策略均在MQFaultStrategy這個類中定義。這裡有一個sendLatencyFaultEnable開關變數,如果開啟,在隨機遞增取模的基礎上,再過濾掉not availableBroker代理。所謂的"latencyFaultTolerance",是指對之前失敗的,按一定的時間做退避。例如,如果上次請求的latency超過550Lms,就退避3000Lms;超過1000L,就退避60000L;如果關閉,採用隨機遞增取模的方式選擇一個佇列(MessageQueue)來發送訊息,latencyFaultTolerance機制是實現訊息傳送高可用的核心關鍵所在。

5.2 Consumer的負載均衡

RocketMQ中,Consumer端的兩種消費模式(Push/Pull)都是基於拉模式來獲取訊息的,而在Push模式只是對pull模式的一種封裝,其本質實現為訊息拉取執行緒在從伺服器拉取到一批訊息後,然後提交到訊息消費執行緒池後,又“馬不停蹄”的繼續向伺服器再次嘗試拉取訊息。如果未拉取到訊息,則延遲一下又繼續拉取。在兩種基於拉模式的消費方式(Push/Pull)中,均需要Consumer端在知道從Broker端的哪一個訊息佇列—佇列中去獲取訊息。因此,有必要在Consumer端來做負載均衡,即Broker端中多個MessageQueue分配給同一個ConsumerGroup中的哪些Consumer消費。

1、Consumer端的心跳包傳送

Consumer啟動後,它就會通過定時任務不斷地向RocketMQ叢集中的所有Broker例項傳送心跳包(其中包含了,訊息消費分組名稱、訂閱關係集合、訊息通訊模式和客戶端id的值等資訊)。Broker端在收到Consumer的心跳訊息後,會將它維護在ConsumerManager的本地快取變數—consumerTable,同時並將封裝後的客戶端網路通道資訊儲存在本地快取變數—channelInfoTable中,為之後做Consumer端的負載均衡提供可以依據的元資料資訊。

2、Consumer端實現負載均衡的核心類—RebalanceImpl

Consumer例項的啟動流程中的啟動MQClientInstance例項部分,會完成負載均衡服務執行緒—RebalanceService的啟動(每隔20s執行一次)。通過檢視原始碼可以發現,RebalanceService執行緒的run()方法最終呼叫的是RebalanceImpl類的rebalanceByTopic()方法,該方法是實現Consumer端負載均衡的核心。這裡,rebalanceByTopic()方法會根據消費者通訊型別為“廣播模式”還是“叢集模式”做不同的邏輯處理。這裡主要來看下叢集模式下的主要處理流程:

(1) 從rebalanceImpl例項的本地快取變數—topicSubscribeInfoTable中,獲取該Topic主題下的訊息消費佇列集合(mqSet);

(2) 根據topicconsumerGroup為引數呼叫mQClientFactory.findConsumerIdList()方法向Broker端傳送獲取該消費組下消費者Id列表的RPC通訊請求(Broker端基於前面Consumer端上報的心跳包資料而構建的consumerTable做出響應返回,業務請求碼:GET_CONSUMER_LIST_BY_GROUP);

(3) 先對Topic下的訊息消費佇列、消費者Id排序,然後用訊息佇列分配策略演算法(預設為:訊息佇列的平均分配演算法),計算出待拉取的訊息佇列。這裡的平均分配演算法,類似於分頁的演算法,將所有MessageQueue排好序類似於記錄,將所有消費端Consumer排好序類似頁數,並求出每一頁需要包含的平均size和每個頁面記錄的範圍range,最後遍歷整個range而計算出當前Consumer端應該分配到的記錄(這裡即為:MessageQueue)。

(4) 然後,呼叫updateProcessQueueTableInRebalance()方法,具體的做法是,先將分配到的訊息佇列集合(mqSet)與processQueueTable做一個過濾比對。

  • 上圖中processQueueTable標註的紅色部分,表示與分配到的訊息佇列集合mqSet互不包含。將這些佇列設定Dropped屬性為true,然後檢視這些佇列是否可以移除出processQueueTable快取變數,這裡具體執行removeUnnecessaryMessageQueue()方法,即每隔1s檢視是否可以獲取當前消費處理佇列的鎖,拿到的話返回true。如果等待1s後,仍然拿不到當前消費處理佇列的鎖則返回false。如果返回true,則從processQueueTable快取變數中移除對應的Entry
  • 上圖中processQueueTable的綠色部分,表示與分配到的訊息佇列集合mqSet的交集。判斷該ProcessQueue是否已經過期了,在Pull模式的不用管,如果是Push模式的,設定Dropped屬性為true,並且呼叫removeUnnecessaryMessageQueue()方法,像上面一樣嘗試移除Entry

最後,為過濾後的訊息佇列集合(mqSet)中的每個MessageQueue建立一個ProcessQueue物件並存入RebalanceImplprocessQueueTable佇列中(其中呼叫RebalanceImpl例項的computePullFromWhere(MessageQueue mq)方法獲取該MessageQueue物件的下一個進度消費值offset,隨後填充至接下來要建立的pullRequest物件屬性中),並建立拉取請求物件—pullRequest新增到拉取列表—pullRequestList中,最後執行dispatchPullRequest()方法,將Pull訊息的請求物件PullRequest依次放入PullMessageService服務執行緒的阻塞佇列pullRequestQueue中,待該服務執行緒取出後向Broker端發起Pull訊息的請求。其中,可以重點對比下,RebalancePushImplRebalancePullImpl兩個實現類的dispatchPullRequest()方法不同,RebalancePullImpl類裡面的該方法為空,這樣子也就回答了上一篇中最後的那道思考題了。

訊息消費佇列在同一消費組不同消費者之間的負載均衡,其核心設計理念是在一個訊息消費佇列在同一時間只允許被同一消費組內的一個消費者消費,一個訊息消費者能同時消費多個訊息佇列。

六、事務訊息

Apache RocketMQ4.3.0版中已經支援分散式事務訊息,這裡RocketMQ採用了2PC的思想來實現了提交事務訊息,同時增加一個補償邏輯來處理二階段超時或者失敗的訊息,如下圖所示。

6.1 RocketMQ事務訊息流程概要

上圖說明了事務訊息的大致方案,其中分為兩個流程:正常事務訊息的傳送及提交、事務訊息的補償流程。

1.事務訊息傳送及提交:

(1) 傳送訊息(half訊息)。
(2) 服務端響應訊息寫入結果。
(3) 根據傳送結果執行本地事務(如果寫入失敗,此時half訊息對業務不可見,本地邏輯不執行)。
(4) 根據本地事務狀態執行Commit或者RollbackCommit操作生成訊息索引,訊息對消費者可見)

2.補償流程:

(1) 對沒有Commit/Rollback的事務訊息(pending狀態的訊息),從服務端發起一次“回查”
(2) Producer收到回查訊息,檢查回查訊息對應的本地事務的狀態
(3) 根據本地事務狀態,重新Commit或者Rollback

其中,補償階段用於解決訊息Commit或者Rollback發生超時或者失敗的情況。

6.2 RocketMQ事務訊息設計

1.事務訊息在一階段對使用者不可見

RocketMQ事務訊息的主要流程中,一階段的訊息如何對使用者不可見。其中,事務訊息相對普通訊息最大的特點就是一階段傳送的訊息對使用者是不可見的。那麼,如何做到寫入訊息但是對使用者不可見呢?RocketMQ事務訊息的做法是:如果訊息是half訊息,將備份原訊息的主題與訊息消費佇列,然後改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由於消費組未訂閱該主題,故消費端無法消費half型別的訊息,然後RocketMQ會開啟一個定時任務,從TopicRMQ_SYS_TRANS_HALF_TOPIC中拉取訊息進行消費,根據生產者組獲取一個服務提供者傳送回查事務狀態請求,根據事務狀態來決定是提交或回滾訊息。

RocketMQ中,訊息在服務端的儲存結構如下,每條訊息都會有對應的索引資訊,Consumer通過ConsumeQueue這個二級索引來讀取訊息實體內容,其流程如下:

RocketMQ的具體實現策略是:寫入的如果事務訊息,對訊息的TopicQueue等屬性進行替換,同時將原來的TopicQueue資訊儲存到訊息的屬性中,正因為訊息主題被替換,故訊息並不會轉發到該原主題的訊息消費佇列,消費者無法感知訊息的存在,不會消費。其實改變訊息主題是RocketMQ的常用“套路”,回想一下延時訊息的實現機制。

2.Commit和Rollback操作以及Op訊息的引入

在完成一階段寫入一條對使用者不可見的訊息後,二階段如果是Commit操作,則需要讓訊息對使用者可見;如果是Rollback則需要撤銷一階段的訊息。先說Rollback的情況。對於Rollback,本身一階段的訊息對使用者是不可見的,其實不需要真正撤銷訊息(實際上RocketMQ也無法去真正的刪除一條訊息,因為是順序寫檔案的)。但是區別於這條訊息沒有確定狀態(Pending狀態,事務懸而未決),需要一個操作來標識這條訊息的最終狀態。RocketMQ事務訊息方案中引入了Op訊息的概念,用Op訊息標識事務訊息已經確定的狀態(Commit或者Rollback)。如果一條事務訊息沒有對應的Op訊息,說明這個事務的狀態還無法確定(可能是二階段失敗了)。引入Op訊息後,事務訊息無論是Commit或者Rollback都會記錄一個Op操作。Commit相對於Rollback只是在寫入Op訊息前建立Half訊息的索引。

3.Op訊息的儲存和對應關係

RocketMQOp訊息寫入到全域性一個特定的Topic中通過原始碼中的方法—TransactionalMessageUtil.buildOpTopic();這個Topic是一個內部的Topic(像Half訊息的Topic一樣),不會被使用者消費。Op訊息的內容為對應的Half訊息的儲存的Offset,這樣通過Op訊息能索引到Half訊息進行後續的回查操作。

4.Half訊息的索引構建

在執行二階段Commit操作時,需要構建出Half訊息的索引。一階段的Half訊息由於是寫到一個特殊的Topic,所以二階段構建索引時需要讀取出Half訊息,並將TopicQueue替換成真正的目標的TopicQueue,之後通過一次普通訊息的寫入操作來生成一條對使用者可見的訊息。所以RocketMQ事務訊息二階段其實是利用了一階段儲存的訊息的內容,在二階段時恢復出一條完整的普通訊息,然後走一遍訊息寫入流程。

5.如何處理二階段失敗的訊息?

如果在RocketMQ事務訊息的二階段過程中失敗了,例如在做Commit操作時,出現網路問題導致Commit失敗,那麼需要通過一定的策略使這條訊息最終被CommitRocketMQ採用了一種補償機制,稱為“回查”。Broker端對未確定狀態的訊息發起回查,將訊息傳送到對應的Producer端(同一個GroupProducer),由Producer根據訊息來檢查本地事務的狀態,進而執行Commit或者RollbackBroker端通過對比Half訊息和Op訊息進行事務訊息的回查並且推進CheckPoint(記錄那些事務訊息的狀態是確定的)。

值得注意的是,rocketmq並不會無休止的的資訊事務狀態回查,預設回查15次,如果15次回查還是無法得知事務狀態,rocketmq預設回滾該訊息。

七、訊息查詢

RocketMQ支援按照下面兩種維度進行訊息查詢

  1. 按照Message Id查詢訊息
  2. 按照Message Key查詢訊息

7.1 按照MessageId查詢訊息

RocketMQ中的MessageId的長度總共有16位元組,其中包含了訊息儲存主機地址(IP地址和埠),訊息Commit Log offset。“按照MessageId查詢訊息”在RocketMQ中具體做法是:Client端從MessageId中解析出Broker的地址(IP地址和埠)和Commit Log的偏移地址後封裝成一個RPC請求後通過Remoting通訊層傳送(業務請求碼:VIEW_MESSAGE_BY_ID)。Broker端走的是QueryMessageProcessor,讀取訊息的過程用其中的commitLog offsetsizecommitLog中找到真正的記錄並解析成一個完整的訊息返回。

7.2 按照Message Key查詢訊息

“按照Message Key查詢訊息”,主要是基於RocketMQIndexFile索引檔案來實現的。RocketMQ的索引檔案邏輯結構,類似JDKHashMap的實現。索引檔案的具體結構如下:

IndexFile索引檔案為使用者提供通過“按照Message Key查詢訊息”的訊息索引查詢服務,IndexFile檔案的儲存位置是:$HOME\store\index\${fileName},檔名fileName是以建立時的時間戳命名的,檔案大小是固定的,等於40+500W\*4+2000W\*20= 420000040個位元組大小。如果訊息的properties中設定了UNIQ_KEY這個屬性,就用 topic + “#” + UNIQ_KEYvalue作為key來做寫入操作。如果訊息設定了KEYS屬性(多個KEY以空格分隔),也會用topic + “#” + KEY來做索引。

其中的索引資料包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset這四個欄位,一共20ByteNextIndex offset即前面讀出來的slotValue,如果有hash衝突,就可以用這個欄位將所有衝突的索引用連結串列的方式串起來了。Timestamp記錄的是訊息storeTimestamp之間的差,並不是一個絕對的時間。整個Index File的結構如圖,40ByteHeader用於儲存一些總的統計資訊,4\*500WSlot Table並不儲存真正的索引資料,而是儲存每個槽位對應的單向連結串列的頭。20\*2000W是真正的索引資料,即一個Index File可以儲存2000W個索引。

“按照Message Key查詢訊息”的方式,RocketMQ的具體做法是,主要通過Broker端的QueryMessageProcessor業務處理器來查詢,讀取訊息的過程就是用topickey找到IndexFile索引檔案中的一條記錄,根據其中的commitLog offsetCommitLog檔案中讀取訊息的實體內容。