1. 程式人生 > 其它 >訊息佇列RocketMQ應用

訊息佇列RocketMQ應用

一、RocketMQ簡介

RocketMQ是阿里巴巴開源的分散式訊息中介軟體。支援事務訊息、順序訊息、批量訊息、定時訊息、訊息回溯等。它裡面有幾個區別於標準訊息中件間的概念,如GroupTopicQueue等。系統組成則由ProducerConsumerBrokerNameServer等。

RocketMQ特點:

  • 是一個佇列模型的訊息中介軟體,具有高效能、高可靠、高實時、分散式等特點;
  • ProducerConsumer、佇列都可以分散式;
  • Producer向一些佇列輪流傳送訊息,佇列集合稱為TopicConsumer如果做廣播消費,則一個Consumer例項消費這個Topic
    對應的所有佇列,如果做叢集消費,則多個Consumer例項平均消費這個Topic對應的佇列集合;
  • 能夠保證嚴格的訊息順序;
  • 支援拉(pull)和推(push)兩種訊息模式;
  • 高效的訂閱者水平擴充套件能力;
  • 實時的訊息訂閱機制;
  • 億級訊息堆積能力;
  • 支援多種訊息協議,如JMSOpenMessaging等;
  • 較少的依賴。

二、訊息中介軟體對比

RocketMQ是阿里巴巴在2012年開源的分散式訊息中介軟體,目前已經捐贈給Apache軟體基金會,並於2017年9月25日成為Apache的頂級專案,撐得住雙11,不過在windows部署會有一些不穩定,最好還是在linux上部署。

三、RocketMQ的核心概念

3.1 Name Server、Broker

訊息佇列RocketMQ在任何一個環境都是可擴充套件的,生產者必須是一個叢集,訊息伺服器必須是一個叢集,消費者也同樣。叢集級別的高可用,是訊息佇列RocketMQ跟其他的訊息伺服器的主要區別,訊息生產者傳送一條訊息到訊息伺服器,訊息伺服器會隨機的選擇一個消費者,只要這個消費者消費成功就認為是成功了。

注意:文中所提及的訊息佇列RocketMQ的服務端或者伺服器包含Name ServerBroker等。服務端不等同於Broker

RocketMQ主要由ProducerBrokerConsumer三部分組成,其中Producer負責生產訊息,Consumer

負責消費訊息,Broker負責儲存訊息。Broker在實際部署過程中對應一臺伺服器,每個Broker可以儲存多個Topic的訊息,每個Topic的訊息也可以分片儲存於不同的BrokerMessage Queue用於儲存訊息的實體地址,每個Topic中的訊息地址儲存於多個Message Queue中。ConsumerGroup由多個Consumer例項構成。

圖中所涉及到的概念如下所述:

  1. Name Server:名稱服務充當路由訊息的提供者。是一個幾乎無狀態節點,可叢集部署,節點之間無任何資訊同步。在訊息佇列RocketMQ中提供命名服務,更新和發現Broker服務。

NameServer即名稱服務,兩個功能:

  • 接收broker的請求,註冊broker的路由資訊
  • 接收client(producer/consumer的請求,根據某個topic獲取其到broker的路由資訊

NameServer沒有狀態,可以橫向擴充套件。每個broker在啟動的時候會到NameServer註冊;
Producer在傳送訊息前會根據topicNameServer獲取路由(到broker)資訊;
Consumer也會定時獲取topic路由資訊。

  1. Broker:訊息中轉角色,負責儲存訊息,轉發訊息。可以理解為訊息佇列伺服器,提供了訊息的接收、儲存、拉取和轉發服務。brokerRocketMQ的核心,所以需要保證broker的高可用。
     broker分為Master BrokerSlave Broker,一個Master Broker可以對應多個Slave Broker,但是一個Slave Broker只能對應一個Master Broker
     MasterSlave的對應關係通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId0表示Master,非0表示SlaveMaster也可以部署多個。
     每個BrokerName Server叢集中的所有節點建立長連線,定時註冊Topic資訊到所有Name ServerBroker啟動後需要完成一次將自己註冊至Name Server的操作;隨後每隔30s定期向Name Server上報Topic路由資訊。
  2. Producer:負責生產訊息,一般由業務系統負責生產訊息。與Name Server叢集中的其中一個節點(隨機)建立長連結(Keep-alive),定期從Name Server讀取Topic路由資訊,並向提供Topic服務的Master Broker建立長連結,且定時向Master Broker傳送心跳。
  3. Consumer:負責消費訊息,一般是後臺系統負責非同步消費。與Name Server叢集中的其中一個節點(隨機)建立長連線,定期從Name Server拉取Topic路由資訊,並向提供Topic服務的Master BrokerSlave Broker建立長連線,且定時向Master BrokerSlave Broker傳送心跳。Consumer既可以從Master Broker訂閱訊息,也可以從Slave Broker訂閱訊息,訂閱規則由Broker配置決定。
    另外,Broker中還存在一些非常重要的名詞需要說明:

3.2 Topic、Queue、tags

RocketMQTopic/QueueJMS中的Topic/Queue概念有一定的差異,JMS中所有消費者都會消費一個Topic訊息的副本,而Queue中訊息只會被一個消費者消費;但到了RocketMQTopic只代表普通的訊息佇列,而Queue是組成Topic的更小單元。

Topic:表示訊息的第一級型別,比如一個電商系統的訊息可以分為:交易訊息、物流訊息等。一條訊息必須有一個Topic。
Queue:主題被劃分為一個或多個子主題,稱為“message queues”。一個topic下,我們可以設定多個queue(訊息佇列)。當我們傳送訊息時,需要要指定該訊息的topic。RocketMQ會輪詢該topic下的所有佇列,將訊息傳送出去。

定義:QueueTopic在一個Broker上的分片,在分片基礎上再等分為若干份(可指定份數)後的其中一份,是負載均衡過程中資源分配的基本單元。叢集消費模式下一個消費者只消費該Topic中部分Queue中的訊息,當一個消費者開啟廣播模式時則會消費該Topic下所有Queue中的訊息。

先看一張有關TopicQueue的關係圖:

TagsTagsTopic下的次級訊息型別/二級型別(注:Tags也支援TagA || TagB這樣的表示式),可以在同一個Topic下基於Tags進行訊息過濾。Tags的過濾需要經過兩次比對,首先會在Broker端通過Tag hashcode進行一次比對過濾,匹配成功傳到consumer端後再對具體Tags進行比對,以防止Tag hashcode重複的情況。比如交易訊息又可以分為:交易建立訊息,交易完成訊息等,一條訊息可以沒有TagRocketMQ提供2級訊息分類,方便大家靈活控制。標籤,換句話說,為使用者提供了額外的靈活性。有了標籤,來自同一個業務模組的不同目的的訊息可能具有相同的主題和不同的標籤。標籤將有助於保持您的程式碼乾淨和連貫,並且標籤還可以為RocketMQ提供的查詢系統提供幫助。

Queue中具體的儲存單元結構如下圖,最後面的8Byte儲存Tag資訊。

3.3 Producer與Producer Group

Producer:表示訊息佇列的生產者。訊息佇列的本質就是實現了publish-subscribe模式,生產者生產訊息,消費者消費訊息。所以這裡的Producer就是用來生產和傳送訊息的,一般指業務系統。RocketMQ提供了傳送:普通訊息(同步、非同步和單向)、定時訊息、延時訊息、事務訊息。

Producer Group:是一類Producer的集合名稱,這類Producer通常傳送一類訊息,且傳送邏輯一致。相同角色的生產者被分組在一起。同一生產者組的另一個生產者例項可能被broker聯絡,以提交或回滾事務,以防原始生產者在交易後崩潰。

警告:考慮提供的生產者在傳送訊息時足夠強大,每個生產者組只允許一個例項,以避免對生產者例項進行不必要的初始化。

3.4 Consumer與Consumer Group

Consumer:訊息消費者,一般由業務後臺系統非同步的消費訊息。

  • Push ConsumerConsumer的一種,應用通常向Consumer物件註冊一個Listener介面,一旦收到訊息,Consumer物件立刻回撥Listener介面方法。
  • Pull ConsumerConsumer的一種,應用通常主動呼叫Consumer的拉訊息方法從Broker拉訊息,主動權由應用控制。

Consumer GroupConsumer Group是一類Consumer的集合名稱,這類Consumer通常消費一類訊息,且消費邏輯一致(使用相同Group ID的訂閱者屬於同一個叢集。同一個叢集下的訂閱者消費邏輯必須完全一致(包括Tag的使用),這些訂閱者在邏輯上可以認為是一個消費節點)。消費者群體是一個偉大的概念,它實現了負載平衡和容錯的目標,在資訊消費方面,是非常容易的。

警告:消費者群體的消費者例項必須訂閱完全相同的主題。

四、元件的關係

4.1 Broker、Producer和Consumer

如果不考慮負載均衡和高可用,最簡單的BrokerProducerConsumer之間的關係如下圖所示:

4.2 Topic,Topic分片和Queue

Queue是RocketMQ中的另一個重要概念。在對該概念進行分析介紹前,我們先來看上面的這張圖:

從本質上來說,RocketMQ中的Queue是資料分片的產物。為了更好地理解Queue的定義,我們還需要引入一個新的概念:Topic分片。在分散式資料庫和分散式快取領域,分片概念已經有了清晰的定義。同理,對於RocketMQ,一個Topic可以分佈在各個Broker上,我們可以把一個Topic分佈在一個Broker上的子集定義為一個Topic分片。對應上圖,TopicA3Topic分片,分佈在Broker1Broker2Broker3上,TopicB2Topic分片,分佈在Broker1Broker2上,TopicC2Topic分片,分佈在Broker2Broker3上。

將Topic分片再切分為若干等分,其中的一份就是一個Queue。每個Topic分片等分的Queue的數量可以不同,由使用者在建立Topic時指定。

queue數量指定方式:
1、程式碼指定:producer.setDefaultTopicQueueNums(8);
2、配置檔案指定
同時設定broker伺服器的配置檔案broker.properties:defaultTopicQueueNums=16
3、rocket-console控制檯指定

我們知道,資料分片的主要目的是突破單點的資源(網路頻寬,CPU,記憶體或檔案儲存)限制從而實現水平擴充套件。RocketMQ在進行Topic分片以後,已經達到水平擴充套件的目的了,為什麼還需要進一步切分為Queue呢?

解答這個問題還需要從負載均衡說起。以訊息消費為例,借用Rocket MQ官方文件中的Consumer負載均衡示意圖來說明:

如圖所示,TOPIC_A在一個Broker上的Topic分片有5Queue,一個Consumer Group內有2Consumer按照叢集消費的方式消費訊息,按照平均分配策略進行負載均衡得到的結果是:第一個Consumer消費3Queue,第二個Consumer消費2Queue。如果增加Consumer,每個Consumer分配到的Queue會相應減少。Rocket MQ的負載均衡策略規定:Consumer數量應該小於等於Queue數量,如果Consumer超過Queue數量,那麼多餘的Consumer將不能消費訊息。

在一個Consumer Group內,QueueConsumer之間的對應關係是一對多的關係:一個Queue最多隻能分配給一個Consumer,一個Cosumer可以分配得到多個Queue。這樣的分配規則,每個Queue只有一個消費者,可以避免消費過程中的多執行緒處理和資源鎖定,有效提高各Consumer消費的並行度和處理效率。

由此,我們可以給出Queue的定義:
QueueTopic在一個Broker上的分片等分為指定份數後的其中一份,是負載均衡過程中資源分配的基本單元。

五、RocketMQ生產模式

  1. 訊息傳送方式
    RocketMQ提供三種方式可以傳送普通訊息:同步、非同步、和單向傳送。
  2. 訊息型別
    訊息客戶端提供多種SDK:普通、順序、事務、延時訊息

5.1 傳送方式

RocketMQ傳送普通訊息的三種方式:同步訊息(預設)、非同步訊息和單向訊息。其中前兩種訊息是可靠的,因為會有傳送是否成功的應答。

5.1.1 同步傳送(Sync)

1、原理

同步傳送是指訊息傳送方發出一條訊息後,會在收到服務端返回響應之後才發下一條訊息的通訊方式。

2、應用場景

可靠的同步傳輸應用於廣泛的場景,例如重要通知郵件、報名簡訊通知、營銷簡訊系統等。

3、例項程式碼

SendResult sendResult = producer.send(msg);

5.2.2 非同步傳送(Async)

1、原理

非同步傳送是指傳送方發出一條訊息後,不等服務端返回響應,接著傳送下一條訊息的通訊方式。訊息佇列RocketMQ版的非同步傳送,需要您實現非同步傳送回撥介面(SendCallback)。訊息傳送方在傳送了一條訊息後,不需要等待服務端響應即可傳送第二條訊息。傳送方通過回撥介面接收服務端響應,並處理響應結果。

2、應用場景

非同步傳送一般用於鏈路耗時較長,對響應時間較為敏感的業務場景,例如,您視訊上傳後通知啟動轉碼服務,轉碼完成後通知推送轉碼結果等。

3、例項程式碼

// 非同步傳送訊息, 傳送結果通過callback返回給客戶端。
producer.sendAsync(msg, new SendCallback() {
    @Override
    public void onSuccess(final SendResult sendResult) {
        // 訊息傳送成功。
        System.out.println("send message success. topic=" + sendResult.getTopic()
                   + ", msgId=" + sendResult.getMessageId());
    }
    
    @Override
    public void onException(OnExceptionContext context) {
        // 訊息傳送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
        System.out.println("send message failed. topic=" + context.getTopic()
                   + ", msgId=" + context.getMessageId());
    }
});

5.2.3 單向傳送(Oneway)

由於在oneway方式傳送訊息時沒有請求應答處理,一旦出現訊息傳送失敗,則會因為沒有重試而導致資料丟失。
若資料不可丟,建議選用可靠同步或可靠非同步傳送方式。

1、原理

傳送方只負責傳送訊息,不等待服務端返回響應且沒有回撥函式觸發,即只發送請求不等待應答。此方式傳送訊息的過程耗時非常短,一般在微秒級別。

2、應用場景

適用於某些耗時非常短,但對可靠性要求並不高的場景,例如日誌收集。

3、例項程式碼

//傳送單向訊息,沒有任何返回結果
producer.sendOneway(msg);

5.2 訊息型別

訊息客戶端提供多種SDK:普通、順序、事務、延時訊息

5.2.1 收發順序訊息

順序訊息(FIFO訊息)是訊息佇列RocketMQ版提供的一種嚴格按照順序來發布和消費的訊息型別。

順序訊息分為兩類:

  • 全域性順序:對於指定的一個Topic,所有訊息按照嚴格的先入先出FIFO(First In First Out)的順序進行釋出和消費。
  • 分割槽順序:對於指定的一個Topic,所有訊息根據Sharding Key進行區塊分割槽。同一個分割槽內的訊息按照嚴格的FIFO順序進行釋出和消費。Sharding Key是順序訊息中用來區分不同分割槽的關鍵欄位,和普通訊息的Key是完全不同的概念。

例項程式碼

傳送訊息

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
     }
}, orderId);

訂閱訊息

consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        context.setAutoCommit(true);
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeOrderlyStatus.SUCCESS;
   }
});

5.2.2 傳送延遲訊息

延時訊息用於指定訊息傳送到訊息佇列RocketMQ版的服務端後,延時一段時間才被投遞到客戶端進行消費(例如3秒後才被消費),適用於解決一些訊息生產和消費有時間視窗要求的場景,或者通過訊息觸發延遲任務的場景,類似於延遲佇列。

例項程式碼

// 延時訊息,單位毫秒(ms),在指定延遲時間(當前時間之後)進行投遞,例如訊息在3秒後投遞。
long delayTime = System.currentTimeMillis() + 3000;
// 設定訊息需要被投遞的時間。 
msg.setStartDeliverTime(delayTime);

5.2.3 傳送定時訊息

定時訊息可以做到在指定時間戳之後才可被消費者消費,適用於對訊息生產和消費有時間視窗要求,或者利用訊息觸發定時任務的場景。

例項程式碼

// 定時訊息,單位毫秒(ms),在指定時間戳(當前時間之後)進行投遞,例如2021-06-26 16:20:00投遞。
//如果被設定成當前時間戳之前的某個時刻,訊息將立即被投遞給消費者。
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-06-26 16:20:00").getTime();
msg.setStartDeliverTime(timeStamp);

5.2.4 傳送延時訊息

使用場景

比如電商裡,提交了一個訂單就可以傳送一個延時訊息,1h後去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。

延時訊息的使用限制

現在RocketMQ並不支援任意時間的延時,需要設定幾個固定的延時等級,從1s2h分別對應著等級118
訊息消費失敗會進入延時訊息佇列,訊息傳送時間與設定的延時等級和重試次數有關

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Message message = new Message("delay", msg.getBytes());
//在建立好message物件之後
message.setDelayTimeLevel(3); //這裡是對應延時等級3對應是10s

5.2.5 傳送事務訊息

訊息佇列RocketMQ版提供類似XAOpen XA的分散式事務功能,通過訊息佇列RocketMQ版事務訊息,能達到分散式事務的最終一致。
事務訊息互動流程如下圖所示。

例項程式碼

傳送事務訊息包含以下兩個步驟:

  1. 傳送半事務訊息(Half Message)及執行本地事務,
public static void main(String[] args) throws MQClientException {
    /**
     * 建立事務訊息Producer
     */
    TransactionMQProducer transactionMQProducer = new TransactionMQProducer(MqConfig.GROUP_ID, getAclRPCHook());
    transactionMQProducer.setNamesrvAddr(MqConfig.NAMESRV_ADDR);
    transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());
    transactionMQProducer.start();

    for (int i = 0; i < 10; i++) {
        try {
            Message message = new Message(MqConfig.TOPIC,
                MqConfig.TAG,
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
                @Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                    System.out.println("開始執行本地事務: " + msg);
                    return LocalTransactionState.UNKNOW;
                }
            }, null);
            assert sendResult != null;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  1. 提交事務訊息狀態。

當本地事務執行完成(執行成功或執行失敗),需要通知伺服器當前訊息的事務狀態。通知方式有以下兩種:

  • 執行本地事務完成後提交。
  • 執行本地事務一直沒提交狀態,等待伺服器回查訊息的事務狀態。

事務狀態有以下三種:

  • TransactionStatus.CommitTransaction:提交事務,允許訂閱方消費該訊息。
  • TransactionStatus.RollbackTransaction:回滾事務,訊息將被丟棄不允許消費。
  • TransactionStatus.Unknow:無法判斷狀態,期待訊息佇列RocketMQ版的Broker向傳送方再次詢問該訊息對應的本地事務的狀態。
/**
 * MQ傳送事務訊息本地Check介面實現類
 */
public class LocalTransactionCheckerImpl implements TransactionCheckListener {

    @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
        System.out.println("收到事務訊息的回查請求, MsgId: " + msg.getMsgId());
        return LocalTransactionState.COMMIT_MESSAGE;
    }

}

事務回查機制說明

  • 傳送事務訊息為什麼必須要實現回查Check機制?當步驟1中半事務訊息傳送完成,但本地事務返回狀態為TransactionStatus.Unknow,或者應用退出導致本地事務未提交任何狀態時,從Broker的角度看,這條Half狀態的訊息的狀態是未知的。因此Broker會定期要求傳送方CheckHalf狀態訊息,並上報其最終狀態。

  • Check被回撥時,業務邏輯都需要做些什麼?事務訊息的Check方法裡面,應該寫一些檢查事務一致性的邏輯。訊息佇列RocketMQ版傳送事務訊息時需要實現LocalTransactionChecker介面,用來處理Broker主動發起的本地事務狀態回查請求,因此在事務訊息的Check方法中,需要完成兩件事情:

    1. 檢查該半事務訊息對應的本地事務的狀態(committed or rollback)。
    2. Broker提交該半事務訊息本地事務的狀態。

5.2.6 傳送批量訊息

批量傳送訊息能顯著提高傳遞小訊息的效能。限制是這些批量訊息應該有相同的topic,相同的waitStoreMsgOK,而且不能是延時訊息。此外,這一批訊息的總大小不應超過4MB

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
   producer.send(messages);
} catch (Exception e) {
   e.printStackTrace();
   //處理error
}

批量傳送訊息

//large batch
String topic = "BatchTest";
List<Message> messages = new ArrayList<>(100 * 1000);
for (int i = 0; i < 100 * 1000; i++) {
    messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}
//split the large batch into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
    List<Message> listItem = splitter.next();
    producer.send(listItem);
}
class ListSplitter implements Iterator<List<Message>> {
    private int sizeLimit = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > sizeLimit) {
                //it is unexpected that single message exceeds the sizeLimit
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                    //if the next sublist has no element, add this one and then break, otherwise just break
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > sizeLimit) {
                break;
            } else {
                totalSize += tmpSize;
            }

        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Not allowed to remove");
    }
}

5.3 分散式訊息OpenMessaging

除了以上傳送方式外,RocketMQ還支援面向雲的分散式訊息,請參考OpenMessaging 示例

六、RocketMQ消費模式

6.1 訂閱方式

訊息佇列RocketMQ版支援以下兩種訂閱方式:
1.叢集訂閱:同一個Group ID所標識的所有Consumer平均分攤消費訊息。例如某個Topic9條訊息,一個Group ID3Consumer例項,那麼在叢集消費模式下每個例項平均分攤,只消費其中的3條訊息。設定方式如下所示。

// 叢集訂閱方式設定(不設定的情況下,預設為叢集訂閱方式)。
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);

2.廣播訂閱:同一個Group ID所標識的所有Consumer都會各自消費某條訊息一次。例如某個Topic9條訊息,一個Group ID3Consumer例項,那麼在廣播消費模式下每個例項都會各自消費9條訊息。設定方式如下所示。

// 廣播訂閱方式設定。
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

說明:請確保同一個Group ID下所有Consumer例項的訂閱關係保持一致。兩種不同的訂閱方式有著不同的功能限制,例如,廣播模式不支援順序訊息、不維護消費進度、不支援重置消費位點等。

6.2 消費方式

訊息佇列RocketMQ版支援以下兩種訊息獲取方式:

  • Push(預設):訊息由訊息佇列RocketMQ版推送至ConsumerPush方式下,訊息佇列RocketMQ版還支援批量消費功能,可以將批量訊息統一推送至Consumer進行消費。
  • Pull:訊息由Consumer主動從訊息佇列RocketMQ版拉取。

push方式實現

// 程式第一次啟動從訊息佇列頭獲取資料
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 廣播消費
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
// 註冊回撥實現類來處理從broker拉取回來的訊息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // 標記該訊息已經被成功消費
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

pull方式實現

//消費者監聽處理訊息的方法,這裡是抽取的方式
PullResult pullResult = consumer.pull(new MessageQueue("pull-push","*");
for (MessageExt messageExt : pullResult.getMsgFoundList()) {
    System.out.println("消費執行緒:"+Thread.currentThread().getName()+
        ",訊息ID:"+messageExt.getMsgId()+
        ",訊息內容:"+new String(messageExt.getBody()));
}

消費端的Push模式是通過長輪詢的模式來實現的,就如同下圖:

Consumer端每隔一段時間主動向broker傳送拉訊息請求,broker在收到Pull請求後,如果有訊息就立即返回資料,Consumer端收到返回的訊息後,再回調消費者設定的Listener方法。如果broker在收到Pull請求時,訊息佇列裡沒有資料,broker端會阻塞請求直到有資料傳遞或超時才返回。

當然,Consumer端是通過一個執行緒將阻塞佇列LinkedBlockingQueue<PullRequest>中的PullRequest傳送到broker拉取訊息,以防止Consumer一致被阻塞。而Broker端,在接收到ConsumerPullRequest時,如果發現沒有訊息,就會把PullRequest扔到ConcurrentHashMap中快取起來。

broker在啟動時,會啟動一個執行緒不停的從ConcurrentHashMap取出PullRequest檢查,直到有資料返回。

區別

推送方式:必須消費者線上,當生產者生成了資訊之後,消費者自動獲取
抽取方式:當生成者生成了訊息之後,這時剛剛啟動消費者,自動抽取生成者沒有消費的資訊

注意:如果是springboot繼承的話,消費者不會停止啟動,都是使用推送方式

6.3 訊息過濾

生產者建立訊息

// 建立訊息,並指定Topic,Tag和訊息體生成者
Message msg = new Message(
"Topic" ,// Topic
"Tag", // Tag
("hello world").getBytes("utf-8") //Message body
);

6.3.1 Tag標籤過濾

傳送訊息時我們會為每一條訊息設定Tag標籤,同一大類中的訊息放在一個主題Topic下,但是如果進行分類我們則可以根據Tag進行分類,每一類消費者可能不是關係某個主題下的所有訊息,我們就可以通過Tag進行過濾,訂閱關注的某一類資料。

// 設定標籤 消費者
// 訂閱主題Topic和Tag
consumer.subscribe("Topic", "TagA || TagB");

消費者組訂閱相同的主題不同的Tag時,如果訂閱是多個Tag則通過“||”分割
同一個消費者組訂閱的主題,Tag必須相同

6.3.2 SQL過濾

SQL92表示式訊息過濾,是通過訊息的屬性執行SQL過濾表示式進行條件匹配,訊息傳送時需要設定使用者的屬性putUserProperty方法設定屬性。

支援的語法:

  1. 數值比較, 如>,>=,<,<=,BETWEEN,=
  2. 字元比較, 如=,<>,IN;
  3. IS NULL或者IS NOT NULL;
  4. 邏輯連線符,AND,OR,NOT;

支援的常量型別:

  1. 數值型,如1233.1415
  2. 字元型,如‘abc’(必須用單引號);
  3. NULL,特殊常數;
  4. 布林值,TRUEFALSE
//訂閱主題Topic和Tag
consumer.subscribe("filterSqlTopic", MessageSelector.bySql("i>5"));

注意:1.只有使用push模式的消費者才能用使用SQL92標準的sql語句
2.SQL92過濾預設關閉,需要在代理開始前設定conf資料夾下broker.conf的引數:

#開啟對filter的支援
enablePropertyFilter=true

優化

拉取訊息時進行過濾的效能很差:

  1. 堆外到堆,一旦每個消費者訂閱相同的主題拉訊息。
  2. 解碼訊息屬性,一旦每個消費者訂閱相同的主題拉訊息。

可使用BloomFilter預計算優化情況,具體請參考:RocketMQ通過SQL92過濾訊息

注意:此優化預設關閉,需要在代理開始開啟時設定一些配置:

  1. enableCalcFilterBitMap = true,表示在構建消費佇列時計算點陣圖。
  2. expectConsumerNumUseFilter = XX(Integer, default is 32), 表示估計訂閱相同主題的消費者數量。
  3. maxErrorRateOfBloomFilter = XX(1~100,預設為20),表示布隆過濾器的錯誤率。
  4. enableConsumeQueueExt = true,表示構造消費佇列擴充套件檔案。

6.3.3 類過濾

通過定義訊息過濾類的介面實現訊息過濾

//使用Java程式碼,在伺服器做訊息過濾
String filterCode = MixAll.file2String("opt\\classfilter\\MessageFilterImpl.java");
consumer.subscribe("TopicFilter1", "cn.gumx.rocketmq.filter.MessageFilterImpl",filterCode);

自定義訊息的過濾類

public class MessageFilterImpl implements MessageFilter {
     
    public boolean match(MessageExt msg, FilterContext arg1) {
        String property = msg.getUserProperty("age");
        if (property != null) {
            int age = Integer.parseInt(property);
            if ((age % 3) == 0 && (age > 10)) {
                return true;
            }
        }
        return false;
    }
}

使用類訊息過濾模式,需要額外需要啟動filter元件mqfiltersrv服務,否則消費不了,每個broker都需要啟動一個,相當於加了一層過濾層。啟動命令如下:

./mqfiltersrv -n 10.10.12.203:9876;10.10.12.204:9876 &

filtersrv出現了。減少了Broker的負擔,又減少了Consumer接收無用的訊息。當然缺點也是有的,多了一層filtersrv網路開銷
MessageFilterImpl訊息過濾實現類中的程式碼最好不要帶有中文防止錯誤

注意RocketMQ4.3.1開始刪除與mqfilter伺服器相關的指令碼,4.3.2刪除客戶端關於mqfilter客戶端程式碼,後面版本不支援該功能。

參考文章