1. 程式人生 > >RocketMQ簡介及實踐

RocketMQ簡介及實踐

new 批量 實時消息 pub sgid master 耗時 最終一致性 情況

What is RocketMQ

Apache RocketMQ是一個分布式消息傳遞和流平臺,具有低延遲,高性能和可靠性,萬億級容量和靈活的可擴展性。 它由四部分組成:NamerServer,Broker,Produer和Customer。 它們中的每一個都可以水平擴展而沒有單一的故障點。 如下面的截圖所示。

技術分享圖片

NameServer Cluster

NameServers提供輕量級服務發現和路由。 每個NameServer記錄完整的路由信息,提供相應的讀寫服務,並支持快速存儲擴展。

NameServer是一個幾乎無狀態的節點,可集群部署,節點之間無任何信息同步。

將NameServer地址列表提供給客戶端有四種方法:

  • 編程方式,例如 producer.setNamesrvAddr("ip:port")
  • Java選項,使用 rocketmq.namesrv.addr
  • 環境變量,使用 NAMESRV_ADDR
  • HTTP端點。

Broker Cluster

Brokers通過提供輕量級的TOPIC和QUEUE機制來處理消息存儲。 它們支持Push和Pull模型,包含容錯機制(2個副本或3個副本),並提供強大的峰值填充和按原始時間順序累積數千億條消息的能力。 此外,Brokers還提供災難恢復,豐富的指標統計和警報機制,這些都是傳統的消息傳遞系統所缺少的。

Broker部署相對復雜,Broker分為Master與Slave,一個Master可以對應多個Slaver,但是一個Slaver只能對應一個Master,Master與Slaver的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slaver。Master可以部署多個。每個Broker與NameServer集群中的所有節點建立長連接,定時註冊Topic信息到所有的NameServer。

如下圖所示,Broker有幾個重要的子模塊組成:

  • 遠程處理模塊,即Broker的入口,處理來自客戶端的請求
  • 客戶端管理模塊,管理客戶端(生產者/消費者)並維護消費者的主題訂閱
  • 存儲服務,提供簡單的API來存儲或查詢物理磁盤中的消息
  • HA服務,提供主從broker之間的數據同步功能
  • 索引服務,按指定的key來為消息創建索引並提供快速消息查詢

技術分享圖片

Producer Cluster

Produers支持分布式部署。 Distributed Producers通過多種負載均衡模式向Broker集群發送消息。發送過程支持快速故障並具有低延遲。 Producer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer取Topic路由信息,並向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Produce完全無狀態。

Customers Cluster

Customers也支持Push和Pull兩種模式的分布式部署。 它還支持群集消費和消息廣播。 它提供實時消息訂閱機制,可以滿足大多數消費者的需求。

Consumer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer取Topic路由信息,並向提供Topic服務的Master、Slaver建立長連接,且定時向Master、Slaver發送心跳。Consumer即可從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。

Best Practices

Produer最佳實踐

發送消息註意事項

1.一個應用盡可能用一個 Topic,消息子類型用 tags 來標識,tags 可以由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才可以利用 tags 在 broker 做消息過濾。

2.每個消息在業務層面的唯一標識碼,要設置到 keys 字段,方便將來定位消息丟失問題。服務器會為每個消息創建索引(哈希索引),應用可以通過 topic,key 來查詢這條消息內容,以及消息被誰消費。由於是哈希索引,請務必保證 key 盡可能唯一,這樣可以避免潛在的哈希沖突。

3.消息發送成功或者失敗,要打印消息日誌,務必要打印 sendresult 和 key 字段。

4.send 消息方法,只要不拋異常,就代表發送成功。但是發送成功會有多個狀態:

  • SEND_OK:消息發送成功
  • FLUSH_DISK_TIMEOUT:消息發送成功,但是服務器刷盤超時,消息已經進入服務器隊列,只有此時服務器宕機,消息才會丟失
  • FLUSH_SLAVE_TIMEOUT:消息發送成功,但是服務器同步到 Slave 時超時,消息已經進入服務器隊列,只有此時服務器宕機,消息才會丟失
  • SLAVE_NOT_AVAILABLE:消息發送成功,但是此時 slave 不可用,消息已經進入服務器隊列,只有此時服務器宕機,消息才會丟失。對於精確發送順序消息的應用,由於順序消息的局限性,可能會涉及到主備自動切換問題,所以如果sendresult 中的 status 字段不等於 SEND_OK,就應該嘗試重試。對於其他應用,則沒有必要這樣

5.對於消息不可丟失應用,務必要有消息重發機制

消息發送失敗處理

Producer 的 send 方法本身支持內部重試,重試邏輯如下:

  1. 至多重試 3 次
  2. 如果發送失敗,則輪轉到下一個 Broker
  3. 這個方法的總耗時時間不超過 sendMsgTimeout 設置的值,默認 10s所以,如果本身向 broker 發送消息產生超時異常,就不會再做重試

如果調用 send 同步方法發送失敗,則嘗試將消息存儲到 db,由後臺線程定時重試,保證消息一定到達 Broker。

選擇 oneway 形式發送

一個 RPC 調用,通常是這樣一個過程

  1. 客戶端發送請求到服務器
  2. 服務器處理該請求
  3. 服務器向客戶端返回應答

所以一個 RPC 的耗時時間是上述三個步驟的總和,而某些場景要求耗時非常短,但是對可靠性要求並不高,例如日誌收集類應用,此類應用可以采用 oneway 形式調用,oneway 形式只發送請求不等待應答,而發送請求在客戶端實現層面僅僅是一個 os 系統調用的開銷,即將數據寫入客戶端的 socket 緩沖區,此過程耗時通常在微秒級。RocketMQ不止可以直接推送消息,在消費端註冊監聽器進行監聽,還可以由消費端決定自己去拉取數據。

Consumer最佳實踐

消費過程要做到冪等

RocketMQ無法做到消息重復,所以如果業務對消息重復非常敏感,務必要在業務層面去重。將消息的唯一鍵,可以是MsgId,也可以是消息內容中的唯一標識字段,例如訂單ID,消費之前判斷是否在DB或Tair(全局KV存儲)中存在,如果不存在則插入,並消費,否則跳過。(實踐過程要考慮原子性問題,判斷是否存在可以嘗試插入,如果報主鍵沖突,則插入失敗,直接跳過) msgid一定是全局唯一的標識符,但是可能會存在同樣的消息有兩個不同的msgid的情況(有多種原因),這種情況可能會使業務上重復,建議最好使用消息體中的唯一標識字段去重。

批量方式消費

如果業務流程支持批量方式消費,則可以很大程度上的提高吞吐量,可以通過設置Consumer的consumerMessageBatchMaxSize參數,默認是1,即一次消費一條。

跳過非重要的消息

發生消息堆積時,如果消費速度一直跟不上發送速度,可以選擇丟棄不重要的消息。例如當前offset和maxOffset差值過大時(可能時因為消息系統堆積),直接把當前消息消費成功,可以快速使消息的消費和發出達到平衡。

優化消息消費過程

根據實際業務需要,盡可能的優化代碼,減少DB訪問數量,進而減少RT,提高消息的消費速度。

順序消息

RocketMQ通過輪詢所有隊列的方式來確定消息被發送到哪一個隊列(負載均衡策略)。比如下面的示例中,訂單號相同的消息會被先後發送到同一個隊列中:

// RocketMQ通過MessageQueueSelector中實現的算法來確定消息發送到哪一個隊列上
// RocketMQ默認提供了兩種MessageQueueSelector實現:隨機/Hash
// 當然你可以根據業務實現自己的MessageQueueSelector來決定消息按照何種策略發送到消息隊列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

事務消息

RocketMQ除了支持普通消息,順序消息,另外還支持事務消息。

MQ與DB一致性

A(存在DB操作)、B(存在DB操作)兩方需要保證分布式事務一致性,通過引入中間層MQ,A和MQ保持事務一致性(異常情況下通過MQ反查A接口實現check),B和MQ保證事務一致(通過重試),從而達到最終事務一致性。

技術分享圖片

上面以DB為例,其實此處可以是任何業務或者數據源。

TransactionCheckListener 是在消息的commit或者rollback消息丟失的情況下才會回調(上圖中灰色部分)。這種消息丟失只存在於斷網或者rocketmq集群掛了的情況下。當rocketmq集群掛了,如果采用異步刷盤,存在1s內數據丟失風險,異步刷盤場景下保障事務沒有意義。所以如果要核心業務用Rocketmq解決分布式事務問題,建議選擇同步刷盤模式。

多系統之間數據一致性技術分享圖片

當需要保證多方(超過2方)的分布式一致性,上面的兩方事務一致性(通過Rocketmq的事務性消息解決)已經無法支持。這個時候需要引入TCC模式思想。

技術分享圖片

以上圖交易系統為例:

1)交易系統創建訂單(往DB插入一條記錄),同時發送訂單創建消息。通過RocketMq事務性消息保證一致性

2)接著執行完成訂單所需的同步核心RPC服務(非核心的系統通過監聽MQ消息自行處理,處理結果不會影響交易狀態)。執行成功更改訂單狀態,同時發送MQ消息。

3)交易系統接受自己發送的訂單創建消息,通過定時調度系統創建延時回滾任務(或者使用RocketMq的重試功能,設置第二次發送時間為定時任務的延遲創建時間。在非消息堵塞的情況下,消息第一次到達延遲為1ms左右,這時可能RPC還未執行完,訂單狀態還未設置為完成,第二次消費時間可以指定)。延遲任務先通過查詢訂單狀態判斷訂單是否完成,完成則不創建回滾任務,否則創建。 PS:多個RPC可以創建一個回滾任務,通過一個消費組接受一次消息就可以;也可以通過創建多個消費組,一個消息消費多次,每次消費創建一個RPC的回滾任務。 回滾任務失敗,通過MQ的重發來重試。

以上是交易系統和其他系統之間保持最終一致性的解決方案。



參考: http://rocketmq.apache.org/docs/rmq-arc/ https://yq.aliyun.com/articles/624207?utm_content=m_1000012577 https://www.jianshu.com/p/2838890f3284 https://www.jianshu.com/p/453c6e7ff81c

RocketMQ簡介及實踐