RocketMQ簡介及實踐
What is RocketMQ
Apache RocketMQ是一個分布式消息傳遞和流平臺,具有低延遲,高性能和可靠性,萬億級容量和靈活的可擴展性。 它由四部分組成:NamerServer,Broker,Produer和Customer。 它們中的每一個都可以水平擴展而沒有單一的故障點。 如下面的截圖所示。
NameServer Cluster
NameServers提供輕量級服務發現和路由。 每個NameServer記錄完整的路由信息,提供相應的讀寫服務,並支持快速存儲擴展。
NameServer是一個幾乎無狀態的節點,可集群部署,節點之間無任何信息同步。
將NameServer地址列表提供給客戶端有四種方法:
- 編程方式,例如 producer.setNamesrvAddr("ip:port")
- Java選項,使用 rocketmq.namesrv.addr
- 環境變量,使用 NAMESRV_ADDR
- HTTP端點。
Broker Cluster
Brokers通過提供輕量級的TOPIC和QUEUE機制來處理消息存儲。 它們支持Push和Pull模型,包含容錯機制(2個副本或3個副本),並提供強大的峰值填充和按原始時間順序累積數千億條消息的能力。 此外,Brokers還提供災難恢復,豐富的指標統計和警報機制,這些都是傳統的消息傳遞系統所缺少的。
Broker部署相對復雜,Broker分為Master與Slave,一個Master可以對應多個Slaver,但是一個Slaver只能對應一個Master,Master與Slaver的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slaver。Master可以部署多個。每個Broker與NameServer集群中的所有節點建立長連接,定時註冊Topic信息到所有的NameServer。
如下圖所示,Broker有幾個重要的子模塊組成:
- 遠程處理模塊,即Broker的入口,處理來自客戶端的請求
- 客戶端管理模塊,管理客戶端(生產者/消費者)並維護消費者的主題訂閱
- 存儲服務,提供簡單的API來存儲或查詢物理磁盤中的消息
- HA服務,提供主從broker之間的數據同步功能
- 索引服務,按指定的key來為消息創建索引並提供快速消息查詢
Producer Cluster
Produers支持分布式部署。 Distributed Producers通過多種負載均衡模式向Broker集群發送消息。發送過程支持快速故障並具有低延遲。 Producer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer取Topic路由信息,並向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Produce完全無狀態。Customers Cluster
Customers也支持Push和Pull兩種模式的分布式部署。 它還支持群集消費和消息廣播。 它提供實時消息訂閱機制,可以滿足大多數消費者的需求。
Consumer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer取Topic路由信息,並向提供Topic服務的Master、Slaver建立長連接,且定時向Master、Slaver發送心跳。Consumer即可從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。
Best Practices
Produer最佳實踐
發送消息註意事項
1.一個應用盡可能用一個 Topic,消息子類型用 tags 來標識,tags 可以由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才可以利用 tags 在 broker 做消息過濾。
2.每個消息在業務層面的唯一標識碼,要設置到 keys 字段,方便將來定位消息丟失問題。服務器會為每個消息創建索引(哈希索引),應用可以通過 topic,key 來查詢這條消息內容,以及消息被誰消費。由於是哈希索引,請務必保證 key 盡可能唯一,這樣可以避免潛在的哈希沖突。
3.消息發送成功或者失敗,要打印消息日誌,務必要打印 sendresult 和 key 字段。
4.send 消息方法,只要不拋異常,就代表發送成功。但是發送成功會有多個狀態:
- SEND_OK:消息發送成功
- FLUSH_DISK_TIMEOUT:消息發送成功,但是服務器刷盤超時,消息已經進入服務器隊列,只有此時服務器宕機,消息才會丟失
- FLUSH_SLAVE_TIMEOUT:消息發送成功,但是服務器同步到 Slave 時超時,消息已經進入服務器隊列,只有此時服務器宕機,消息才會丟失
- SLAVE_NOT_AVAILABLE:消息發送成功,但是此時 slave 不可用,消息已經進入服務器隊列,只有此時服務器宕機,消息才會丟失。對於精確發送順序消息的應用,由於順序消息的局限性,可能會涉及到主備自動切換問題,所以如果sendresult 中的 status 字段不等於 SEND_OK,就應該嘗試重試。對於其他應用,則沒有必要這樣
5.對於消息不可丟失應用,務必要有消息重發機制
消息發送失敗處理
Producer 的 send 方法本身支持內部重試,重試邏輯如下:
- 至多重試 3 次
- 如果發送失敗,則輪轉到下一個 Broker
- 這個方法的總耗時時間不超過 sendMsgTimeout 設置的值,默認 10s所以,如果本身向 broker 發送消息產生超時異常,就不會再做重試
如果調用 send 同步方法發送失敗,則嘗試將消息存儲到 db,由後臺線程定時重試,保證消息一定到達 Broker。
選擇 oneway 形式發送
一個 RPC 調用,通常是這樣一個過程
- 客戶端發送請求到服務器
- 服務器處理該請求
- 服務器向客戶端返回應答
所以一個 RPC 的耗時時間是上述三個步驟的總和,而某些場景要求耗時非常短,但是對可靠性要求並不高,例如日誌收集類應用,此類應用可以采用 oneway 形式調用,oneway 形式只發送請求不等待應答,而發送請求在客戶端實現層面僅僅是一個 os 系統調用的開銷,即將數據寫入客戶端的 socket 緩沖區,此過程耗時通常在微秒級。RocketMQ不止可以直接推送消息,在消費端註冊監聽器進行監聽,還可以由消費端決定自己去拉取數據。
Consumer最佳實踐
消費過程要做到冪等
RocketMQ無法做到消息重復,所以如果業務對消息重復非常敏感,務必要在業務層面去重。將消息的唯一鍵,可以是MsgId,也可以是消息內容中的唯一標識字段,例如訂單ID,消費之前判斷是否在DB或Tair(全局KV存儲)中存在,如果不存在則插入,並消費,否則跳過。(實踐過程要考慮原子性問題,判斷是否存在可以嘗試插入,如果報主鍵沖突,則插入失敗,直接跳過) msgid一定是全局唯一的標識符,但是可能會存在同樣的消息有兩個不同的msgid的情況(有多種原因),這種情況可能會使業務上重復,建議最好使用消息體中的唯一標識字段去重。
批量方式消費
如果業務流程支持批量方式消費,則可以很大程度上的提高吞吐量,可以通過設置Consumer的consumerMessageBatchMaxSize參數,默認是1,即一次消費一條。
跳過非重要的消息
發生消息堆積時,如果消費速度一直跟不上發送速度,可以選擇丟棄不重要的消息。例如當前offset和maxOffset差值過大時(可能時因為消息系統堆積),直接把當前消息消費成功,可以快速使消息的消費和發出達到平衡。
優化消息消費過程
根據實際業務需要,盡可能的優化代碼,減少DB訪問數量,進而減少RT,提高消息的消費速度。
順序消息
RocketMQ通過輪詢所有隊列的方式來確定消息被發送到哪一個隊列(負載均衡策略)。比如下面的示例中,訂單號相同的消息會被先後發送到同一個隊列中:
// RocketMQ通過MessageQueueSelector中實現的算法來確定消息發送到哪一個隊列上 // RocketMQ默認提供了兩種MessageQueueSelector實現:隨機/Hash // 當然你可以根據業務實現自己的MessageQueueSelector來決定消息按照何種策略發送到消息隊列中 SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);
事務消息
RocketMQ除了支持普通消息,順序消息,另外還支持事務消息。
MQ與DB一致性
A(存在DB操作)、B(存在DB操作)兩方需要保證分布式事務一致性,通過引入中間層MQ,A和MQ保持事務一致性(異常情況下通過MQ反查A接口實現check),B和MQ保證事務一致(通過重試),從而達到最終事務一致性。
上面以DB為例,其實此處可以是任何業務或者數據源。
TransactionCheckListener 是在消息的commit或者rollback消息丟失的情況下才會回調(上圖中灰色部分)。這種消息丟失只存在於斷網或者rocketmq集群掛了的情況下。當rocketmq集群掛了,如果采用異步刷盤,存在1s內數據丟失風險,異步刷盤場景下保障事務沒有意義。所以如果要核心業務用Rocketmq解決分布式事務問題,建議選擇同步刷盤模式。
多系統之間數據一致性
當需要保證多方(超過2方)的分布式一致性,上面的兩方事務一致性(通過Rocketmq的事務性消息解決)已經無法支持。這個時候需要引入TCC模式思想。
以上圖交易系統為例:
1)交易系統創建訂單(往DB插入一條記錄),同時發送訂單創建消息。通過RocketMq事務性消息保證一致性
2)接著執行完成訂單所需的同步核心RPC服務(非核心的系統通過監聽MQ消息自行處理,處理結果不會影響交易狀態)。執行成功更改訂單狀態,同時發送MQ消息。
3)交易系統接受自己發送的訂單創建消息,通過定時調度系統創建延時回滾任務(或者使用RocketMq的重試功能,設置第二次發送時間為定時任務的延遲創建時間。在非消息堵塞的情況下,消息第一次到達延遲為1ms左右,這時可能RPC還未執行完,訂單狀態還未設置為完成,第二次消費時間可以指定)。延遲任務先通過查詢訂單狀態判斷訂單是否完成,完成則不創建回滾任務,否則創建。 PS:多個RPC可以創建一個回滾任務,通過一個消費組接受一次消息就可以;也可以通過創建多個消費組,一個消息消費多次,每次消費創建一個RPC的回滾任務。 回滾任務失敗,通過MQ的重發來重試。
以上是交易系統和其他系統之間保持最終一致性的解決方案。
參考: http://rocketmq.apache.org/docs/rmq-arc/ https://yq.aliyun.com/articles/624207?utm_content=m_1000012577 https://www.jianshu.com/p/2838890f3284 https://www.jianshu.com/p/453c6e7ff81c
RocketMQ簡介及實踐