訊息佇列RocketMQ高階原理
基本概念
1 訊息模型(Message Model)
RocketMQ
主要由Producer
、Broker
、Consumer
三部分組成,其中Producer
負責生產訊息,Consumer
負責消費訊息,Broker
負責儲存訊息。Broker
在實際部署過程中對應一臺伺服器,每個Broker
可以儲存多個Topic
的訊息,每個Topic
的訊息也可以分片儲存於不同的Broker
。Message Queue
用於儲存訊息的實體地址,每個Topic
中的訊息地址儲存於多個Message Queue
中(預設4個)。ConsumerGroup
由多個Consumer
例項構成。
2 訊息生產者(Producer)
負責生產訊息,一般由業務系統負責生產訊息。一個訊息生產者會把業務應用系統裡產生的訊息傳送到broker
RocketMQ
提供多種傳送方式,同步傳送、非同步傳送、順序傳送、單向傳送。同步和非同步方式均需要Broker
返回確認資訊,單向傳送不需要。
3 訊息消費者(Consumer)
負責消費訊息,一般是後臺系統負責非同步消費。一個訊息消費者會從Broker
伺服器拉取訊息、並將其提供給應用程式。從使用者應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。
4 主題(Topic)
表示一類訊息的集合,每個主題包含若干條訊息,每條訊息只能屬於一個主題,是RocketMQ
進行訊息訂閱的基本單位。
5 代理伺服器(Broker Server)
訊息中轉角色,負責儲存訊息、轉發訊息。代理伺服器在RocketMQ
BrokerServer
:Broker
主要負責訊息的儲存、投遞和查詢以及服務高可用保證,為了實現這些功能,Broker
包含了以下幾個重要子模組。
-
Remoting Module
:整個Broker
的實體,負責處理來自clients
端的請求。 -
Client Manager
:負責管理客戶端(Producer/Consumer
)和維護Consumer
的Topic
訂閱資訊 -
Store Service
:提供方便簡單的API
介面處理訊息儲存到物理硬碟和查詢功能。 -
HA Service
:高可用服務,提供Master Broker
和Slave Broker
之間的資料同步功能。 -
Index Service
:根據特定的Message key
對投遞到Broker
的訊息進行索引服務,以提供訊息的快速查詢。
6 名字服務(Name Server)
名稱服務充當路由訊息的提供者。生產者或消費者能夠通過名字服務查詢各主題相應的Broker IP
列表。多個Namesrv
例項組成叢集,但相互獨立,沒有資訊交換。
7 拉取式消費(Pull Consumer)
Consumer
消費的一種型別,應用通常主動呼叫Consumer
的拉訊息方法從Broker
伺服器拉訊息、主動權由應用控制。一旦獲取了批量訊息,應用就會啟動消費過程。
8 推動式消費(Push Consumer)
Consumer
消費的一種型別,該模式下Broker
收到資料後會主動推送給消費端,該消費模式一般實時性較高。
9 生產者組(Producer Group)
同一類Producer
的集合,這類Producer
傳送同一類訊息且傳送邏輯一致。如果傳送的是事務訊息且原始生產者在傳送之後崩潰,則Broker
伺服器會聯絡同一生產者組的其他生產者例項以提交或回溯消費。
10 消費者組(Consumer Group)
同一類Consumer
的集合,這類Consumer
通常消費同一類訊息且消費邏輯一致。消費者組使得在訊息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者例項必須訂閱完全相同的Topic
。RocketMQ
支援兩種訊息模式:叢集消費(Clustering
)和廣播消費(Broadcasting
)。
11 叢集消費(Clustering)
叢集消費模式下,相同Consumer Group
的每個Consumer
例項平均分攤訊息。
12 廣播消費(Broadcasting)
廣播消費模式下,相同Consumer Group
的每個Consumer
例項都接收全量的訊息。
13 普通順序訊息(Normal Ordered Message)
普通順序消費模式下,消費者通過同一個訊息佇列(Topic
分割槽,稱作Message Queue
)收到的訊息是有順序的,不同訊息佇列收到的訊息則可能是無順序的。
14 嚴格順序訊息(Strictly Ordered Message)
嚴格順序訊息模式下,消費者收到的所有訊息均是有順序的。
15 訊息(Message)
訊息系統所傳輸資訊的物理載體,生產和消費資料的最小單位,每條訊息必須屬於一個主題。RocketMQ
中每個訊息擁有唯一的Message ID
,且可以攜帶具有業務標識的Key
。系統提供了通過Message ID
和Key
查詢訊息的功能。
16 標籤(Tag)
為訊息設定的標誌,用於同一主題下區分不同型別的訊息。來自同一業務單元的訊息,可以根據不同業務目的在同一主題下設定不同標籤。標籤能夠有效地保持程式碼的清晰度和連貫性,並優化RocketMQ
提供的查詢系統。消費者可以根據Tag
實現對不同子主題的不同消費邏輯,實現更好的擴充套件性。
二、特性
1 訂閱與釋出
訊息的釋出是指某個生產者向某個topic
傳送訊息;訊息的訂閱是指某個消費者關注了某個topic
中帶有某些tag
的訊息,進而從該topic
消費資料。
2 訊息順序
訊息有序指的是一類訊息消費時,能按照發送的順序來消費。例如:一個訂單產生了三條訊息分別是訂單建立、訂單付款、訂單完成。消費時要按照這個順序消費才能有意義,但是同時訂單之間是可以並行消費的。RocketMQ
可以嚴格的保證訊息有序。
順序訊息分為全域性順序訊息與分割槽順序訊息,全域性順序是指某個Topic
下的所有訊息都要保證順序;部分順序訊息只要保證每一組訊息被順序消費即可。
- 全域性順序
對於指定的一個Topic
,所有訊息按照嚴格的先入先出(FIFO
)的順序進行釋出和消費。
適用場景:效能要求不高,所有的訊息嚴格按照FIFO
原則進行訊息釋出和消費的場景 - 分割槽順序
對於指定的一個Topic
,所有訊息根據sharding key
進行區塊分割槽。同一個分割槽內的訊息按照嚴格的FIFO
順序進行釋出和消費。Sharding key
是順序訊息中用來區分不同分割槽的關鍵欄位,和普通訊息的Key
是完全不同的概念。
適用場景:效能要求高,以sharding key
作為分割槽欄位,在同一個區塊中嚴格的按照FIFO
原則進行訊息釋出和消費的場景。
3 訊息過濾
RocketMQ
的消費者可以根據Tag
進行訊息過濾,也支援自定義屬性過濾。訊息過濾目前是在Broker
端實現的,優點是減少了對於Consumer
無用訊息的網路傳輸,缺點是增加了Broker
的負擔、而且實現相對複雜。
4 訊息可靠性
RocketMQ
支援訊息的高可靠,影響訊息可靠性的幾種情況:
-
Broker
非正常關閉 -
Broker
異常Crash
OS Crash
- 機器掉電,但是能立即恢復供電情況
- 機器無法開機(可能是
cpu
、主機板、記憶體等關鍵裝置損壞) - 磁碟裝置損壞
1)、2)、3)、4) 四種情況都屬於硬體資源可立即恢復情況,RocketMQ
在這四種情況下能保證訊息不丟,或者丟失少量資料(依賴刷盤方式是同步還是非同步)。
5)、6)屬於單點故障,且無法恢復,一旦發生,在此單點上的訊息全部丟失。RocketMQ
在這兩種情況下,通過非同步複製,可保證99%的訊息不丟,但是仍然會有極少量的訊息可能丟失。通過同步雙寫技術可以完全避免單點,同步雙寫勢必會影響效能,適合對訊息可靠性要求極高的場合,例如與Money
相關的應用。注:RocketMQ
從3.0
版本開始支援同步雙寫。
5 至少一次
至少一次(At least Once
)指每個訊息必須投遞一次。Consumer
先Pull
訊息到本地,消費完成後,才向伺服器返回ack
,如果沒有消費一定不會ack
訊息,所以RocketMQ
可以很好的支援此特性。
6 回溯消費
回溯消費是指Consumer
已經消費成功的訊息,由於業務上需求需要重新消費,要支援此功能,Broker
在向Consumer
投遞成功訊息後,訊息仍然需要保留。並且重新消費一般是按照時間維度,例如由於Consumer
系統故障,恢復後需要重新消費1小時前的資料,那麼Broker
要提供一種機制,可以按照時間維度來回退消費進度。RocketMQ
支援按照時間回溯消費,時間維度精確到毫秒。
7 事務訊息
事務訊息(Transactional Message
)是指應用本地事務和傳送訊息操作可以被定義到全域性事務中,要麼同時成功,要麼同時失敗。RocketMQ
的事務訊息提供類似X/Open XA
的分佈事務功能,通過事務訊息能達到分散式事務的最終一致。
8 定時訊息
定時訊息(延遲佇列)是指訊息傳送到broker
後,不會立即被消費,等待特定時間投遞給真正的topic
。broker
有配置項messageDelayLevel
,預設值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
”,18
個level
。可以配置自定義messageDelayLevel
。注意,messageDelayLevel
是broker
的屬性,不屬於某個topic
。發訊息時,設定delayLevel
等級即可:msg.setDelayLevel(level)
。level
有以下三種情況:
-
level == 0
,訊息為非延遲訊息 -
1 <= level <= maxLevel
,訊息延遲特定時間,例如level ==1
,延遲1s
-
level > maxLevel
,則level == maxLevel
,例如level ==20
,延遲2h
定時訊息會暫存在名為SCHEDULE_TOPIC_XXXX
的topic
中,並根據delayTimeLevel
存入特定的queue
,queueId = delayTimeLevel – 1
,即一個queue
只存相同延遲的訊息,保證具有相同傳送延遲的訊息能夠順序消費。broker
會排程地消費SCHEDULE_TOPIC_XXXX
,將訊息寫入真實的topic
。
需要注意的是,定時訊息會在第一次寫入和排程寫入真實topic
時都會計數,因此傳送數量、tps
都會變高。
9 訊息重試
Consumer
消費訊息失敗後,要提供一種重試機制,令訊息再消費一次。Consumer
消費訊息失敗通常可以認為有以下幾種情況:
- 由於訊息本身的原因,例如反序列化失敗,訊息資料本身無法處理(例如話費充值,當前訊息的手機號被登出,無法充值)等。這種錯誤通常需要跳過這條訊息,再消費其它訊息,而這條失敗的訊息即使立刻重試消費,
99%
也不成功,所以最好提供一種定時重試機制,即過10
秒後再重試。 - 由於依賴的下游應用服務不可用,例如
db
連線不可用,外系統網路不可達等。遇到這種錯誤,即使跳過當前失敗的訊息,消費其他訊息同樣也會報錯。這種情況建議應用sleep 30s
,再消費下一條訊息,這樣可以減輕Broker
重試訊息的壓力。
RocketMQ
會為每個消費組都設定一個Topic
名稱為“%RETRY%+consumerGroup
”的重試佇列(這裡需要注意的是,這個Topic
的重試佇列是針對消費組,而不是針對每個Topic
設定的),用於暫時儲存因為各種異常而導致Consumer
端無法消費的訊息。考慮到異常恢復起來需要一些時間,會為重試佇列設定多個重試級別,每個重試級別都有與之對應的重新投遞延時,重試次數越多投遞延時就越大。RocketMQ
對於重試訊息的處理是先儲存至Topic
名稱為“SCHEDULE_TOPIC_XXXX
”的延遲佇列中,後臺定時任務按照對應的時間進行Delay
後重新儲存至“%RETRY%+consumerGroup
”的重試佇列中。
10 訊息重投
生產者在傳送訊息時,同步訊息失敗會重投,非同步訊息有重試,oneway
沒有任何保證。訊息重投保證訊息儘可能傳送成功、不丟失,但可能會造成訊息重複,訊息重複在RocketMQ
中是無法避免的問題。訊息重複在一般情況下不會發生,當出現訊息量大、網路抖動,訊息重複就會是大概率事件。另外,生產者主動重發、consumer
負載變化也會導致重複訊息。如下方法可以設定訊息重試策略:
-
retryTimesWhenSendFailed
:同步傳送失敗重投次數,預設為2
,因此生產者會最多嘗試傳送retryTimesWhenSendFailed + 1
次。不會選擇上次失敗的broker
,嘗試向其他broker
傳送,最大程度保證訊息不丟。超過重投次數,丟擲異常,由客戶端保證訊息不丟。當出現RemotingException
、MQClientException
和部分MQBrokerException
時會重投。 -
retryTimesWhenSendAsyncFailed
:非同步傳送失敗重試次數,非同步重試不會選擇其他broker
,僅在同一個broker
上做重試,不保證訊息不丟。 -
retryAnotherBrokerWhenNotStoreOK
:訊息刷盤(主或備)超時或slave
不可用(返回狀態非SEND_OK),是否嘗試傳送到其他broker
,預設false
。十分重要訊息可以開啟。
11 流量控制
生產者流控,因為broker
處理能力達到瓶頸;消費者流控,因為消費能力達到瓶頸。
生產者流控:
-
commitLog
檔案被鎖時間超過osPageCacheBusyTimeOutMills
時,引數預設為1000ms,返回流控。 - 如果開啟
transientStorePoolEnable == true
,且broker
為非同步刷盤的主機,且transientStorePool
中資源不足,拒絕當前send
請求,返回流控。 -
broker
每隔10ms檢查send
請求佇列頭部請求的等待時間,如果超過waitTimeMillsInSendQueue
,預設200ms,拒絕當前send
請求,返回流控。 -
broker
通過拒絕send
請求方式實現流量控制。
注意,生產者流控,不會嘗試訊息重投。
消費者流控:
- 消費者本地快取訊息數超過
pullThresholdForQueue
時,預設1000
。 - 消費者本地快取訊息大小超過
pullThresholdSizeForQueue
時,預設100MB
。 - 消費者本地快取訊息跨度超過
consumeConcurrentlyMaxSpan
時,預設2000
。
消費者流控的結果是降低拉取頻率。
12 死信佇列
死信佇列用於處理無法被正常消費的訊息。當一條訊息初次消費失敗,訊息佇列會自動進行訊息重試;達到最大重試次數後,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該訊息,此時,訊息佇列不會立刻將訊息丟棄,而是將其傳送到該消費者對應的特殊佇列中。
RocketMQ
將這種正常情況下無法被消費的訊息稱為死信訊息(Dead-Letter Message
),將儲存死信訊息的特殊佇列稱為死信佇列(Dead-Letter Queue
)。在RocketMQ
中,可以通過使用console
控制檯對死信佇列中的訊息進行重發來使得消費者例項再次進行消費。
三、訊息儲存
訊息儲存是RocketMQ
中最為複雜和最為重要的一部分,本節將分別從RocketMQ
的訊息儲存整體架構、PageCache
與Mmap記憶體對映以及RocketMQ
中兩種不同的刷盤方式三方面來分別展開敘述。RocketMQ
底層優化:順序寫、非同步刷、零拷貝
3.1 訊息儲存整體架構
訊息儲存架構圖中主要有下面三個跟訊息儲存相關的檔案構成。
(1) CommitLog:訊息主體以及元資料的儲存主體,儲存Producer
端寫入的訊息主體內容,訊息內容不是定長的。單個檔案大小預設1G
,檔名長度為20
位,左邊補零,剩餘為起始偏移量,比如00000000000000000000
代表了第一個檔案,起始偏移量為0
,檔案大小為1G
=1073741824
;當第一個檔案寫滿了,第二個檔案為00000000001073741824
,起始偏移量為1073741824
,以此類推。訊息主要是順序寫入日誌檔案,當檔案滿了,寫入下一個檔案;
(2) ConsumeQueue:訊息消費佇列,引入的目的主要是提高訊息消費的效能,由於RocketMQ
是基於主題topic
的訂閱模式,訊息消費是針對主題進行的,如果要遍歷commitlog
檔案中根據topic
檢索訊息是非常低效的。Consumer
即可根據ConsumeQueue
來查詢待消費的訊息。其中,ConsumeQueue
(邏輯消費佇列)作為消費訊息的索引,儲存了指定Topic
下的佇列訊息在CommitLog
中的起始物理偏移量offset
,訊息大小size
和訊息Tag
的HashCode
值。consumequeue
檔案可以看成是基於topic
的commitlog
索引檔案,故consumequeue
資料夾的組織方式如下:topic/queue/file
三層組織結構,具體儲存路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
。同樣consumequeue
檔案採取定長設計,每一個條目共20個位元組,分別為8位元組的commitlog
物理偏移量、4
位元組的訊息長度、8
位元組tag hashcode
,單個檔案由30W
個條目組成,可以像陣列一樣隨機訪問每一個條目,每個ConsumeQueue
檔案大小約5.72M
;
(3) IndexFile:IndexFile
(索引檔案)提供了一種可以通過key
或時間區間來查詢訊息的方法。Index
檔案的儲存位置是:$HOME\store\index\${fileName}
,檔名fileName
是以建立時的時間戳命名的,固定的單個IndexFile
檔案大小約為400M
,一個IndexFile
可以儲存2000W
個索引,IndexFile
的底層儲存設計為在檔案系統中實現HashMap
結構,故rocketmq
的索引檔案其底層實現為hash
索引。
在上面的RocketMQ
的訊息儲存整體架構圖中可以看出,RocketMQ
採用的是混合型的儲存結構,即為Broker
單個例項下所有的佇列共用一個日誌資料檔案(即為CommitLog
)來儲存。RocketMQ
的混合型儲存結構(多個Topic
的訊息實體內容都儲存於一個CommitLog
中)針對Producer
和Consumer
分別採用了資料和索引部分相分離的儲存結構,Producer
傳送訊息至Broker
端,然後Broker
端使用同步或者非同步的方式對訊息刷盤持久化,儲存至CommitLog
中。只要訊息被刷盤持久化至磁碟檔案CommitLog
中,那麼Producer
傳送的訊息就不會丟失。正因為如此,Consumer
也就肯定有機會去消費這條訊息。當無法拉取到訊息後,可以等下一次訊息拉取,同時服務端也支援長輪詢模式,如果一個訊息拉取請求未拉取到訊息,Broker
允許等待30s
的時間,只要這段時間內有新訊息到達,將直接返回給消費端。這裡,RocketMQ
的具體做法是,使用Broker
端的後臺服務執行緒—ReputMessageService
不停地分發請求並非同步構建ConsumeQueue
(邏輯消費佇列)和IndexFile
(索引檔案)資料。
3.2 頁快取與記憶體對映
頁快取(PageCache)是OS
對檔案的快取,用於加速對檔案的讀寫。一般來說,程式對檔案進行順序讀寫的速度幾乎接近於記憶體的讀寫速度,主要原因就是由於OS
使用PageCache
機制對讀寫訪問操作進行了效能優化,將一部分的記憶體用作PageCache
。對於資料的寫入,OS
會先寫入至Cache
內,隨後通過非同步的方式由pdflush
核心執行緒將Cache
內的資料刷盤至物理磁碟上。對於資料的讀取,如果一次讀取檔案時出現未命中PageCache
的情況,OS
從物理磁碟上訪問讀取檔案的同時,會順序對其他相鄰塊的資料檔案進行預讀取。
在RocketMQ
中,ConsumeQueue
邏輯消費佇列儲存的資料較少,並且是順序讀取,在page cache
機制的預讀取作用下,Consume Queue
檔案的讀效能幾乎接近讀記憶體,即使在有訊息堆積情況下也不會影響效能。而對於CommitLog
訊息儲存的日誌資料檔案來說,讀取訊息內容時候會產生較多的隨機訪問讀取,嚴重影響效能。如果選擇合適的系統IO
排程演算法,比如設定排程演算法為“Deadline”(此時塊儲存採用SSD
的話),隨機讀的效能也會有所提升。
另外,RocketMQ
主要通過MappedByteBuffer
對檔案進行讀寫操作。其中,利用了NIO
中的FileChannel
模型將磁碟上的物理檔案直接對映到使用者態的記憶體地址中(這種Mmap
的方式減少了傳統IO
將磁碟檔案資料在作業系統核心地址空間的緩衝區和使用者應用程式地址空間的緩衝區之間來回進行拷貝的效能開銷),將對檔案的操作轉化為直接對記憶體地址進行操作,從而極大地提高了檔案的讀寫效率(正因為需要使用記憶體對映機制,故RocketMQ
的檔案儲存都使用定長結構來儲存,方便一次將整個檔案對映至記憶體)。
3.3 訊息刷盤
(1) 同步刷盤:如上圖所示,只有在訊息真正持久化至磁碟後RocketMQ
的Broker
端才會真正返回給Producer
端一個成功的ACK
響應。同步刷盤對MQ
訊息可靠性來說是一種不錯的保障,但是效能上會有較大影響,一般適用於金融業務應用該模式較多。
(2) 非同步刷盤:能夠充分利用OS
的PageCache
的優勢,只要訊息寫入PageCache
即可將成功的ACK
返回給Producer
端。訊息刷盤採用後臺非同步執行緒提交的方式進行,降低了讀寫延遲,提高了MQ
的效能和吞吐量。
四、通訊機制
RocketMQ
訊息佇列叢集主要包括NameServer
、Broker
(Master
/Slave
)、Producer
、Consumer
4個角色,基本通訊流程如下:
(1) Broker
啟動後需要完成一次將自己註冊至NameServer
的操作;隨後每隔30s
時間定時向NameServer
上報Topic
路由資訊。
(2) 訊息生產者Producer
作為客戶端傳送訊息時候,需要根據訊息的Topic
從本地快取的TopicPublishInfoTable
獲取路由資訊。如果沒有則更新路由資訊會從NameServer
上重新拉取,同時Producer
會預設每隔30s
向NameServer
拉取一次路由資訊。
(3) 訊息生產者Producer
根據2
)中獲取的路由資訊選擇一個佇列(MessageQueue
)進行訊息傳送;Broker
作為訊息的接收者接收訊息並落盤儲存。
(4) 訊息消費者Consumer
根據2
)中獲取的路由資訊,並再完成客戶端的負載均衡後,選擇其中的某一個或者某幾個訊息佇列來拉取訊息並進行消費。
從上面1)~3)中可以看出在訊息生產者,Broker
和NameServer
之間都會發生通訊(這裡只說了MQ
的部分通訊),因此如何設計一個良好的網路通訊模組在MQ
中至關重要,它將決定RocketMQ
叢集整體的訊息傳輸能力與最終的效能。
rocketmq-remoting
模組是RocketMQ
訊息佇列中負責網路通訊的模組,它幾乎被其他所有需要網路通訊的模組(諸如rocketmq-client
、rocketmq-broker
、rocketmq-namesrv
)所依賴和引用。為了實現客戶端與伺服器之間高效的資料請求與接收,RocketMQ
訊息佇列自定義了通訊協議並在Netty
的基礎之上擴充套件了通訊模組。
4.1 Remoting通訊類結構
4.2 協議設計與編解碼
在Client
和Server
之間完成一次訊息傳送時,需要對傳送的訊息進行一個協議約定,因此就有必要自定義RocketMQ
的訊息協議。同時,為了高效地在網路中傳輸訊息和對收到的訊息讀取,就需要對訊息進行編解碼。在RocketMQ
中,RemotingCommand
這個類在訊息傳輸過程中對所有資料內容的封裝,不但包含了所有的資料結構,還包含了編碼解碼操作。
Header欄位 | 型別 | Request說明 | Response說明 |
---|---|---|---|
code |
int |
請求操作碼,應答方根據不同的請求碼進行不同的業務處理 | 應答響應碼。0 表示成功,非0 則表示各種錯誤 |
language |
LanguageCode |
請求方實現的語言 | 應答方實現的語言 |
version |
int |
請求方程式的版本 | 應答方程式的版本 |
opaque |
int |
相當於requestId ,在同一個連線上的不同請求標識碼,與響應訊息中的相對應 |
應答不做修改直接返回 |
flag |
int |
區分是普通RPC 還是onewayRPC 的標誌 |
區分是普通RPC 還是onewayRPC 的標誌 |
remark |
String |
傳輸自定義文字資訊 | 傳輸自定義文字資訊 |
extFields |
HashMap |
請求自定義擴充套件資訊 | 響應自定義擴充套件資訊 |
可見傳輸內容主要可以分為以下4部分:
(1) 訊息長度:總長度,四個位元組儲存,佔用一個int
型別;
(2) 序列化型別&訊息頭長度:同樣佔用一個int
型別,第一個位元組表示序列化型別,後面三個位元組表示訊息頭長度;
(3) 訊息頭資料:經過序列化後的訊息頭資料;
(4) 訊息主體資料:訊息主體的二進位制位元組資料內容;
4.3 訊息的通訊方式和流程
在RocketMQ
訊息佇列中支援通訊的方式主要有同步(sync
)、非同步(async
)、單向(oneway
)三種。其中“單向”通訊模式相對簡單,一般用在傳送心跳包場景下,無需關注其Response
。這裡,主要介紹RocketMQ
的非同步通訊流程。
4.4 Reactor多執行緒設計
RocketMQ
的RPC
通訊採用Netty
元件作為底層通訊庫,同樣也遵循了Reactor
多執行緒模型,同時又在這之上做了一些擴充套件和優化。
上面的框圖中可以大致瞭解RocketMQ
中NettyRemotingServer
的Reactor
多執行緒模型。一個Reactor
主執行緒(eventLoopGroupBoss
,即為上面的1)負責監聽TCP
網路連線請求,建立好連線,建立SocketChannel
,並註冊到selector
上。RocketMQ
的原始碼中會自動根據OS
的型別選擇NIO
和Epoll
,也可以通過引數配置),然後監聽真正的網路資料。拿到網路資料後,再丟給Worker
執行緒池(eventLoopGroupSelector
,即為上面的“N”,原始碼中預設設定為3),在真正執行業務邏輯之前需要進行SSL
驗證、編解碼、空閒檢查、網路連線管理,這些工作交給defaultEventExecutorGroup
(即為上面的“M1”,原始碼中預設設定為8)去做。而處理業務操作放在業務執行緒池中執行,根據RomotingCommand
的業務請求碼code
去processorTable
這個本地快取變數中找到對應的processor
,然後封裝成task
任務後,提交給對應的業務processor
處理執行緒池來執行(sendMessageExecutor
,以傳送訊息為例,即為上面的“M2”)。從入口到業務邏輯的幾個步驟中執行緒池一直再增加,這跟每一步邏輯複雜性相關,越複雜,需要的併發通道越寬。
執行緒數 | 執行緒名 | 執行緒具體說明 |
---|---|---|
1 | NettyBoss_%d | Reactor 主執行緒 |
N | NettyServerEPOLLSelector_%d_%d | Reactor 執行緒池 |
M1 | NettyServerCodecThread_%d | Worker執行緒池 |
M2 | RemotingExecutorThread_%d | 業務processor處理執行緒池 |
五、負載均衡
RocketMQ
中的負載均衡都在Client
端完成,具體來說的話,主要可以分為Producer
端傳送訊息時候的負載均衡和Consumer
端訂閱訊息的負載均衡。
5.1 Producer的負載均衡
Producer
端在傳送訊息的時候,會先根據Topic
找到指定的TopicPublishInfo
,在獲取了TopicPublishInfo
路由資訊後,RocketMQ
的客戶端在預設方式下selectOneMessageQueue()
方法會從TopicPublishInfo
中的messageQueueList
中選擇一個佇列(MessageQueue
)進行傳送訊息。具體的容錯策略均在MQFaultStrategy
這個類中定義。這裡有一個sendLatencyFaultEnable
開關變數,如果開啟,在隨機遞增取模的基礎上,再過濾掉not available
的Broker
代理。所謂的"latencyFaultTolerance
",是指對之前失敗的,按一定的時間做退避。例如,如果上次請求的latency
超過550Lms
,就退避3000Lms
;超過1000L
,就退避60000L
;如果關閉,採用隨機遞增取模的方式選擇一個佇列(MessageQueue
)來發送訊息,latencyFaultTolerance
機制是實現訊息傳送高可用的核心關鍵所在。
5.2 Consumer的負載均衡
在RocketMQ
中,Consumer
端的兩種消費模式(Push/Pull
)都是基於拉模式來獲取訊息的,而在Push
模式只是對pull
模式的一種封裝,其本質實現為訊息拉取執行緒在從伺服器拉取到一批訊息後,然後提交到訊息消費執行緒池後,又“馬不停蹄”的繼續向伺服器再次嘗試拉取訊息。如果未拉取到訊息,則延遲一下又繼續拉取。在兩種基於拉模式的消費方式(Push/Pull
)中,均需要Consumer
端在知道從Broker
端的哪一個訊息佇列—佇列中去獲取訊息。因此,有必要在Consumer
端來做負載均衡,即Broker
端中多個MessageQueue
分配給同一個ConsumerGroup
中的哪些Consumer
消費。
1、Consumer端的心跳包傳送
在Consumer
啟動後,它就會通過定時任務不斷地向RocketMQ
叢集中的所有Broker
例項傳送心跳包(其中包含了,訊息消費分組名稱、訂閱關係集合、訊息通訊模式和客戶端id
的值等資訊)。Broker
端在收到Consumer
的心跳訊息後,會將它維護在ConsumerManager
的本地快取變數—consumerTable
,同時並將封裝後的客戶端網路通道資訊儲存在本地快取變數—channelInfoTable
中,為之後做Consumer
端的負載均衡提供可以依據的元資料資訊。
2、Consumer端實現負載均衡的核心類—RebalanceImpl
在Consumer
例項的啟動流程中的啟動MQClientInstance
例項部分,會完成負載均衡服務執行緒—RebalanceService
的啟動(每隔20s
執行一次)。通過檢視原始碼可以發現,RebalanceService
執行緒的run()
方法最終呼叫的是RebalanceImpl
類的rebalanceByTopic()
方法,該方法是實現Consumer
端負載均衡的核心。這裡,rebalanceByTopic()
方法會根據消費者通訊型別為“廣播模式”還是“叢集模式”做不同的邏輯處理。這裡主要來看下叢集模式下的主要處理流程:
(1) 從rebalanceImpl
例項的本地快取變數—topicSubscribeInfoTable
中,獲取該Topic
主題下的訊息消費佇列集合(mqSet
);
(2) 根據topic
和consumerGroup
為引數呼叫mQClientFactory.findConsumerIdList()
方法向Broker
端傳送獲取該消費組下消費者Id
列表的RPC
通訊請求(Broker
端基於前面Consumer
端上報的心跳包資料而構建的consumerTable
做出響應返回,業務請求碼:GET_CONSUMER_LIST_BY_GROUP
);
(3) 先對Topic
下的訊息消費佇列、消費者Id
排序,然後用訊息佇列分配策略演算法(預設為:訊息佇列的平均分配演算法),計算出待拉取的訊息佇列。這裡的平均分配演算法,類似於分頁的演算法,將所有MessageQueue
排好序類似於記錄,將所有消費端Consumer
排好序類似頁數,並求出每一頁需要包含的平均size
和每個頁面記錄的範圍range
,最後遍歷整個range
而計算出當前Consumer
端應該分配到的記錄(這裡即為:MessageQueue
)。
(4) 然後,呼叫updateProcessQueueTableInRebalance()
方法,具體的做法是,先將分配到的訊息佇列集合(mqSet
)與processQueueTable
做一個過濾比對。
- 上圖中
processQueueTable
標註的紅色部分,表示與分配到的訊息佇列集合mqSet
互不包含。將這些佇列設定Dropped
屬性為true
,然後檢視這些佇列是否可以移除出processQueueTable
快取變數,這裡具體執行removeUnnecessaryMessageQueue()
方法,即每隔1s
檢視是否可以獲取當前消費處理佇列的鎖,拿到的話返回true
。如果等待1s
後,仍然拿不到當前消費處理佇列的鎖則返回false
。如果返回true
,則從processQueueTable
快取變數中移除對應的Entry
; - 上圖中
processQueueTable
的綠色部分,表示與分配到的訊息佇列集合mqSet
的交集。判斷該ProcessQueue
是否已經過期了,在Pull
模式的不用管,如果是Push
模式的,設定Dropped
屬性為true
,並且呼叫removeUnnecessaryMessageQueue()
方法,像上面一樣嘗試移除Entry
;
最後,為過濾後的訊息佇列集合(mqSet
)中的每個MessageQueue
建立一個ProcessQueue
物件並存入RebalanceImpl
的processQueueTable
佇列中(其中呼叫RebalanceImpl
例項的computePullFromWhere(MessageQueue mq)
方法獲取該MessageQueue
物件的下一個進度消費值offset
,隨後填充至接下來要建立的pullRequest
物件屬性中),並建立拉取請求物件—pullRequest
新增到拉取列表—pullRequestList
中,最後執行dispatchPullRequest()
方法,將Pull
訊息的請求物件PullRequest
依次放入PullMessageService
服務執行緒的阻塞佇列pullRequestQueue
中,待該服務執行緒取出後向Broker
端發起Pull
訊息的請求。其中,可以重點對比下,RebalancePushImpl
和RebalancePullImpl
兩個實現類的dispatchPullRequest()
方法不同,RebalancePullImpl
類裡面的該方法為空,這樣子也就回答了上一篇中最後的那道思考題了。
訊息消費佇列在同一消費組不同消費者之間的負載均衡,其核心設計理念是在一個訊息消費佇列在同一時間只允許被同一消費組內的一個消費者消費,一個訊息消費者能同時消費多個訊息佇列。
六、事務訊息
Apache RocketMQ
在4.3.0
版中已經支援分散式事務訊息,這裡RocketMQ
採用了2PC
的思想來實現了提交事務訊息,同時增加一個補償邏輯來處理二階段超時或者失敗的訊息,如下圖所示。
6.1 RocketMQ事務訊息流程概要
上圖說明了事務訊息的大致方案,其中分為兩個流程:正常事務訊息的傳送及提交、事務訊息的補償流程。
1.事務訊息傳送及提交:
(1) 傳送訊息(half
訊息)。
(2) 服務端響應訊息寫入結果。
(3) 根據傳送結果執行本地事務(如果寫入失敗,此時half
訊息對業務不可見,本地邏輯不執行)。
(4) 根據本地事務狀態執行Commit
或者Rollback
(Commit
操作生成訊息索引,訊息對消費者可見)
2.補償流程:
(1) 對沒有Commit/Rollback
的事務訊息(pending
狀態的訊息),從服務端發起一次“回查”
(2) Producer
收到回查訊息,檢查回查訊息對應的本地事務的狀態
(3) 根據本地事務狀態,重新Commit
或者Rollback
其中,補償階段用於解決訊息Commit
或者Rollback
發生超時或者失敗的情況。
6.2 RocketMQ事務訊息設計
1.事務訊息在一階段對使用者不可見
在RocketMQ
事務訊息的主要流程中,一階段的訊息如何對使用者不可見。其中,事務訊息相對普通訊息最大的特點就是一階段傳送的訊息對使用者是不可見的。那麼,如何做到寫入訊息但是對使用者不可見呢?RocketMQ
事務訊息的做法是:如果訊息是half
訊息,將備份原訊息的主題與訊息消費佇列,然後改變主題為RMQ_SYS_TRANS_HALF_TOPIC
。由於消費組未訂閱該主題,故消費端無法消費half
型別的訊息,然後RocketMQ
會開啟一個定時任務,從Topic
為RMQ_SYS_TRANS_HALF_TOPIC
中拉取訊息進行消費,根據生產者組獲取一個服務提供者傳送回查事務狀態請求,根據事務狀態來決定是提交或回滾訊息。
在RocketMQ
中,訊息在服務端的儲存結構如下,每條訊息都會有對應的索引資訊,Consumer
通過ConsumeQueue
這個二級索引來讀取訊息實體內容,其流程如下:
RocketMQ
的具體實現策略是:寫入的如果事務訊息,對訊息的Topic
和Queue
等屬性進行替換,同時將原來的Topic
和Queue
資訊儲存到訊息的屬性中,正因為訊息主題被替換,故訊息並不會轉發到該原主題的訊息消費佇列,消費者無法感知訊息的存在,不會消費。其實改變訊息主題是RocketMQ
的常用“套路”,回想一下延時訊息的實現機制。
2.Commit和Rollback操作以及Op訊息的引入
在完成一階段寫入一條對使用者不可見的訊息後,二階段如果是Commit
操作,則需要讓訊息對使用者可見;如果是Rollback
則需要撤銷一階段的訊息。先說Rollback
的情況。對於Rollback
,本身一階段的訊息對使用者是不可見的,其實不需要真正撤銷訊息(實際上RocketMQ
也無法去真正的刪除一條訊息,因為是順序寫檔案的)。但是區別於這條訊息沒有確定狀態(Pending
狀態,事務懸而未決),需要一個操作來標識這條訊息的最終狀態。RocketMQ
事務訊息方案中引入了Op
訊息的概念,用Op
訊息標識事務訊息已經確定的狀態(Commit
或者Rollback
)。如果一條事務訊息沒有對應的Op
訊息,說明這個事務的狀態還無法確定(可能是二階段失敗了)。引入Op
訊息後,事務訊息無論是Commit
或者Rollback
都會記錄一個Op
操作。Commit
相對於Rollback
只是在寫入Op
訊息前建立Half
訊息的索引。
3.Op訊息的儲存和對應關係
RocketMQ
將Op
訊息寫入到全域性一個特定的Topic
中通過原始碼中的方法—TransactionalMessageUtil.buildOpTopic();
這個Topic
是一個內部的Topic
(像Half
訊息的Topic
一樣),不會被使用者消費。Op
訊息的內容為對應的Half
訊息的儲存的Offset
,這樣通過Op
訊息能索引到Half
訊息進行後續的回查操作。
4.Half訊息的索引構建
在執行二階段Commit
操作時,需要構建出Half
訊息的索引。一階段的Half
訊息由於是寫到一個特殊的Topic
,所以二階段構建索引時需要讀取出Half
訊息,並將Topic
和Queue
替換成真正的目標的Topic
和Queue
,之後通過一次普通訊息的寫入操作來生成一條對使用者可見的訊息。所以RocketMQ
事務訊息二階段其實是利用了一階段儲存的訊息的內容,在二階段時恢復出一條完整的普通訊息,然後走一遍訊息寫入流程。
5.如何處理二階段失敗的訊息?
如果在RocketMQ
事務訊息的二階段過程中失敗了,例如在做Commit
操作時,出現網路問題導致Commit
失敗,那麼需要通過一定的策略使這條訊息最終被Commit
。RocketMQ
採用了一種補償機制,稱為“回查”。Broker
端對未確定狀態的訊息發起回查,將訊息傳送到對應的Producer
端(同一個Group
的Producer
),由Producer
根據訊息來檢查本地事務的狀態,進而執行Commit
或者Rollback
。Broker
端通過對比Half
訊息和Op
訊息進行事務訊息的回查並且推進CheckPoint
(記錄那些事務訊息的狀態是確定的)。
值得注意的是,rocketmq
並不會無休止的的資訊事務狀態回查,預設回查15次,如果15次回查還是無法得知事務狀態,rocketmq
預設回滾該訊息。
七、訊息查詢
RocketMQ
支援按照下面兩種維度進行訊息查詢
- 按照
Message Id
查詢訊息 - 按照
Message Key
查詢訊息
7.1 按照MessageId查詢訊息
RocketMQ
中的MessageId
的長度總共有16
位元組,其中包含了訊息儲存主機地址(IP
地址和埠),訊息Commit Log offset
。“按照MessageId
查詢訊息”在RocketMQ
中具體做法是:Client
端從MessageId
中解析出Broker
的地址(IP
地址和埠)和Commit Log
的偏移地址後封裝成一個RPC
請求後通過Remoting
通訊層傳送(業務請求碼:VIEW_MESSAGE_BY_ID
)。Broker
端走的是QueryMessageProcessor
,讀取訊息的過程用其中的commitLog offset
和size
去commitLog
中找到真正的記錄並解析成一個完整的訊息返回。
7.2 按照Message Key查詢訊息
“按照Message Key
查詢訊息”,主要是基於RocketMQ
的IndexFile
索引檔案來實現的。RocketMQ
的索引檔案邏輯結構,類似JDK
中HashMap
的實現。索引檔案的具體結構如下:
IndexFile
索引檔案為使用者提供通過“按照Message Key
查詢訊息”的訊息索引查詢服務,IndexFile
檔案的儲存位置是:$HOME\store\index\${fileName}
,檔名fileName
是以建立時的時間戳命名的,檔案大小是固定的,等於40+500W\*4+2000W\*20= 420000040
個位元組大小。如果訊息的properties
中設定了UNIQ_KEY
這個屬性,就用 topic + “#” + UNIQ_KEY
的value
作為key
來做寫入操作。如果訊息設定了KEYS
屬性(多個KEY
以空格分隔),也會用topic + “#” + KEY
來做索引。
其中的索引資料包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset
這四個欄位,一共20Byte
。NextIndex offset
即前面讀出來的slotValue
,如果有hash
衝突,就可以用這個欄位將所有衝突的索引用連結串列的方式串起來了。Timestamp
記錄的是訊息storeTimestamp
之間的差,並不是一個絕對的時間。整個Index File
的結構如圖,40Byte
的Header
用於儲存一些總的統計資訊,4\*500W
的Slot Table
並不儲存真正的索引資料,而是儲存每個槽位對應的單向連結串列的頭。20\*2000W
是真正的索引資料,即一個Index File
可以儲存2000W
個索引。
“按照Message Key
查詢訊息”的方式,RocketMQ
的具體做法是,主要通過Broker
端的QueryMessageProcessor
業務處理器來查詢,讀取訊息的過程就是用topic
和key
找到IndexFile
索引檔案中的一條記錄,根據其中的commitLog offset
從CommitLog
檔案中讀取訊息的實體內容。