1. 程式人生 > 其它 >RocketMQ系列(二)——應用篇

RocketMQ系列(二)——應用篇

技術標籤:RocketMQ

本篇以應用角度講解RocketMQ的本地安裝、啟動與簡單配置,以及各主要類的功能及使用方式。更多其它瞭解請參考:

 RocketMQ系列(一)——基礎篇
 RocketMQ系列(三)——原理篇

一、本地安裝與部署

 以windows平臺為例,Linux、Mac OS類似

1、安裝包下載地址

 https://mirror.bit.edu.cn/apache/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
 解壓後即可使用,記得配置環境變數 ROCKETMQ_HOME=${yourPath}\rocketmq-all-4.2.0-bin-release以及加入到CLASSPATH

2、啟動

 cd %ROCKETMQ_HOME%\bin
 start mqnamesrv.cmd # 首先需要啟動NameServer
 start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true # 啟動broker並設定可以自動建立topic

3、配置檔案

 官方預設提供了4中broker的部署方式,分別是單節點、雙主從同步雙寫(2m-2s-sync)、雙主雙從非同步複製(2m-2s-async)和雙主模式(2m-noslave),啟動各個broker時可以在 mqbroker.cmd -c 引數後面跟配置檔案指定部署模式。

在這裡插入圖片描述
 每個預設配置項的含義:

配置項含義
brokerClusterName=DefaultCluster叢集名稱,相同的brokerClusterName組成叢集
brokerId=0節點id,0表示master,其它表示slave
brokerName=broker-a節點名稱,相同節點名的多個節點為主從關係
brokerRole=ASYNC_MASTER節點角色,SYNC_MASTER:同步雙寫的master,ASYNC_MASTER:非同步複製的master,SLAVE:從節點
deleteWhen=04在每天的幾點刪除已經超過檔案保留時間的commit log
fileReservedTime=48訊息儲存的時間,小時為單位
flushDiskType=ASYNC_FLUSH持久化方式,SYNC_FLUSH:同步刷盤,ASYNC_FLUSH:非同步刷盤

二、主要類介紹

1、生產者相關

DefaultMQProducer

 預設生產者,應用程式通過各種過載的send和sendOneway方法傳送訊息,此外還可以建立topic;一個DefaultMQProducer例項必須且只能屬於某一個producer group,可以向多個topic傳送訊息,一個producer group在同一套配置中也只能有一個producer例項,具體原因在系列三的原理篇會講到。

TransactionMQProducer

 事務訊息生產者,繼承自DefaultMQProducer,除了DefaultMQProducer提供的方法外,還可以通過sendMessageInTransaction方法傳送事務訊息,這要求註冊一個事務監聽介面(TransactionListener)用於完成本地事務的執行和執行狀態判斷。

SendCallBack

 非同步傳送回撥介面,定義了非同步傳送訊息完成後的執行操作。

public interface SendCallback {
    // 傳送完成(注意有別於傳送成功)回撥方法
    void onSuccess(final SendResult sendResult);
     
    // 傳送異常回調方法
    void onException(final Throwable e);
}

RPCHook

 顧名思義,rpc呼叫鉤子,可以在客戶端向服務端(這裡指broker,下同)發起呼叫前(doBeforeRequest)後(doAfterResponse)執行特定操作,比如列印日誌等;同樣也適用於消費者

MessageQueueSelector

 訊息佇列選擇器,選擇訊息需要傳送的佇列,可以用於負載均衡和訊息區域性有序場景,如輪詢佇列或根據訂單號取模選擇傳送佇列;不傳預設使用MQFaultFallbackStrategy選擇佇列

public interface MessageQueueSelector {
    // 從訊息列表mqs中選擇一個佇列傳送msg訊息
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

TransactionListener

 事務監聽介面,用於事務訊息傳送時服務端回撥客戶端以執行本地事務或回查本地事務狀態(LocalTransactionState )

public interface TransactionListener {
    // 服務端通知客戶端執行本地事務
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
     
    // 服務端在呼叫executeLocalTransaction超時後,回查本地事務執行狀態
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

MessageQueue

 訊息佇列實體類,包含所屬topic,所在的broker name和佇列id

Message

 訊息實體類,主要包含所屬topic,應用程式需要傳送的訊息體body,以及Map型別的屬性集合properties

 在properties中儲存的屬性大致可以分為三類:

  TAGS、KEYS、DELAY等預定義業務屬性,使用者可以通過介面訪問

  RETRY_TOPIC、REAL_TOPIC、TRAN_MSG等預定義系統屬性,用於特定功能的實現

  使用者自定義屬性,如設定age=8,amount=1000等,用於訊息過濾,見系列一基礎篇的SQL表示式過濾模式

狀態相關

SendResult

 同步傳送訊息時會立即返回傳送結果,包含傳送狀態物件SendStatus

SendStatus

 服務端返回的訊息投遞結果,對於事務訊息,後三種情況下客戶端會要求服務端回滾訊息

列舉取值解釋說明
SEND_OK傳送成功服務端事務訊息提交
FLUSH_DISK_TIMEOUT刷盤超時服務端事務訊息回滾
FLUSH_SLAVE_TIMEOUT同步slave超時服務端事務訊息回滾
SLAVE_NOT_AVAILABLE沒有可用的slave服務端事務訊息回滾

LocalTransactionState

 本地事務狀態,事務監聽介面返回該狀態

列舉取值解釋
COMMIT_MESSAGE服務端提交訊息
ROLLBACK_MESSAGE服務端回滾訊息
UNKNOW未決狀態,返回該狀態後服務端會再次回查

2、消費者相關

DefaultMQPushConsumer

 預設的push方式消費者,通過註冊MessageListener實現類定義消費業務邏輯

DefaultMQPullConsumer

 預設的pull方式消費者,可以獲取到某topic下的訊息佇列並主動拉取訊息進行消費

MessageFilter

 訊息過濾器介面,自定義訊息過濾規則,哪些訊息可以推送到消費者

public interface MessageFilter {
    boolean match(final MessageExt msg, final FilterContext context);
}

AllocateMessageQueueStrategy

 訊息佇列分配策略,定義同一個topic下的訊息佇列(注意不是訊息,通常一個訊息佇列只由同一個consumer消費)列表在不同consumer之間如何分配,在建立consumer時可指定。
 很明顯,該策略只在叢集模式下才會使用,廣播模式下每個消費者都需要消費所有佇列。
 官方提供了6種實現,具體實現策略戳這裡,預設策略是AllocateMessageQueueAveragely,介面定義如下:

public interface AllocateMessageQueueStrategy {
    // 返回當前consumer消費的佇列
    List<MessageQueue> allocate(
        final String consumerGroup,     // 消費者組
        final String currentCID,        // 當前consumer id
        final List<MessageQueue> mqAll,   // 當前topic下的全部訊息佇列
        final List<String> cidAll     // 該consumer group下的所有consumer id列表
    );
     
    // 返回策略名稱
    String getName();
}

MessageListener

 訊息監聽介面,有MessageListenerConcurrently和MessageListenerOrderly兩種擴充套件方式

MessageListenerConcurrently

 併發訊息監聽介面,可以批量獲取訊息,不保證訊息的接收和消費順序;返回併發消費狀態ConsumeConcurrentlyStatus

public interface MessageListenerConcurrently extends MessageListener {
    // 併發消費訊息方法宣告
    ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs, final ConsumeConcurrentlyContext context);
}

MessageListenerOrderly

 順序訊息監聽介面,單執行緒保證訊息的消費順序;返回順序消費狀態ConsumeOrderlyStatus

public interface MessageListenerOrderly extends MessageListener {
    // 順序消費訊息方法宣告
    ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs, final ConsumeOrderlyContext context);
}

型別/狀態相關

MessageModel

 消費者的訊息模式

列舉取值解釋說明
BROADCASTING廣播模式訊息偏移量儲存在本地
沒有負載均衡,消費時不存在佇列選擇問題
CLUSTERING叢集模式訊息偏移量儲存在服務端
push模式下有負載均衡問題,消費時可以指定訊息佇列選擇器
pull模式下可以拉取全部佇列,也可以均衡拉取佇列進行消費

ConsumeFromWhere

 定義消費者啟動後從何處開始消費

列舉取值解釋
CONSUME_FROM_LAST_OFFSET從最新的偏移量開始消費
CONSUME_FROM_FIRST_OFFSET從佇列頭開始消費
CONSUME_FROM_TIMESTAMP從某個時刻開始消費

ConsumeConcurrentlyStatus

 併發消費狀態

列舉取值解釋
CONSUME_SUCCESS消費成功
ECONSUME_LATER消費失敗,會重新投遞到%RETRY%+${consumergroup}佇列

ConsumeOrderlyStatus

 順序消費狀態

列舉取值解釋
SUCCESS消費成功
SUSPEND_CURRENT_QUEUE_A_MOMENT消費失敗,稍後會重新消費

PullResult

 拉取訊息返回結果,包含拉取狀態PullStatus

PullStatus

 拉取結果

列舉取值解釋
FOUND拉取成功
NO_NEW_MSG沒有新訊息
NO_MATCHED_MSG沒有匹配的訊息
OFFSET_ILLEGAL偏移量不合法

三、生產者

1、DefaultMQProducer

初始化並啟動

 建立並啟動DefaultMQProducer

private DefaultMQProducer createDefaultProducer() throws MQClientException {
    DefaultMQProducer defaultMQProducer = new DefaultMQProducer(defaultProducerGroup, new RPCHook() {
        @Override
        public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
            // TODO 向服務端發起請求前執行操作
        }
 
        @Override
        public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
            // TODO 服務端發起請求返回後執行操作
        }
    });
    defaultMQProducer.setNamesrvAddr(namesrvAddr);
    // 預設佇列數量
    defaultMQProducer.setDefaultTopicQueueNums(2);
    // 傳送超時時間
    defaultMQProducer.setSendMsgTimeout(100);
    // 傳送失敗最大重試次數
    defaultMQProducer.setRetryTimesWhenSendFailed(2);
    defaultMQProducer.start();
 
    return defaultMQProducer;
}

傳送訊息

// 單條訊息傳送
SendResult sendResult = defaultMQProducer.send(message);
 
// 批量訊息傳送
SendResult sendResult = defaultMQProducer.send(Lists.newArrayList(message));
 
// 設定傳送超時時間
SendResult sendResult = defaultMQProducer.send(message, timeout);
 
// 指定訊息佇列
SendResult sendResult = defaultMQProducer.send(message, messageQueue);
 
// 指定訊息佇列選擇器,可以實現相關訊息的順序傳送
SendResult sendResult = defaultMQProducer.send(message, messageQueueSelector, null);
 
// 非同步傳送並指定回撥介面
defaultMQProducer.send(message, sendCallback);
 
// 單向傳送
defaultMQProducer.sendOneway(message);
 
// 單向傳送並指定訊息佇列
defaultMQProducer.sendOneway(message, messageQueue);
 
// 單向傳送並指定訊息佇列選擇器
defaultMQProducer.sendOneway(message, messageQueueSelector, key);

……
以上傳送方式的組合

2、TransactionMQProducer

初始化並啟動

 建立並啟動TransactionMQProducer

private TransactionMQProducer createTransactionProducer() throws MQClientException {
    TransactionMQProducer transactionMQProducer = new TransactionMQProducer(transactionProducerGroup);
    transactionMQProducer.setNamesrvAddr(namesrvAddr);
    transactionMQProducer.setRetryTimesWhenSendFailed(2);
    transactionMQProducer.setTransactionListener(new TransactionListener() {
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // TODO 執行本地事務,成功返回COMMIT_MESSAGE,失敗返回ROLLBACK_MESSAGE,未決返回UNKNOW
            return LocalTransactionState.COMMIT_MESSAGE;
        }
 
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            // TODO 查詢本地事務狀態,返回結果同上
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    });
    transactionMQProducer.start();
 
    return transactionMQProducer;
}

傳送訊息

SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, key);

四、消費者

1、PushConsumer

初始化與啟動

public DefaultMQPushConsumer createConcurrentlyPushConsumer() throws Exception {
    DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(pushConsumerGroup);
    pushConsumer.setNamesrvAddr(namesrvAddr);
    pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
    // 過濾方式一、TAG方式
    pushConsumer.subscribe(topic1, "TAG-A || TAG-B*");
    // 過濾方式二、TSQL方式
    pushConsumer.subscribe(topic2, MessageSelector.bySql("amount between 500 and 1000"));
    // 過濾方式三、類過濾方式
    /// 注意:由於是直接將filter檔案直接傳送到broker,因此檔案中不能有非RocketMQ核心包之外的外部依賴
    String filterClassSource = MixAll.file2String("D:\\workspace\\……\\rocketmq\\impl\\MessageFilterImpl.java");
    String fullName = MessageFilterImpl.class.getCanonicalName();
    pushConsumer.subscribe(topic3, fullName, filterClassSource);
 
    // 叢集模式,同一ConsumerGroup中只有一個消費者能消費
    pushConsumer.setMessageModel(MessageModel.CLUSTERING);
    // 消費執行緒池最小執行緒數
    pushConsumer.setConsumeThreadMin(2);
    // 消費執行緒池最大執行緒數
    pushConsumer.setConsumeThreadMax(5);
    // 批量消費,一次消費多少條訊息,也即messageListener方法中msgs的size上限
    pushConsumer.setConsumeMessageBatchMaxSize(10);
    // 批量拉訊息,一次最多拉多少條。問題:為什麼push模式下會有拉取訊息的操作?
    pushConsumer.setPullBatchSize(10);
    // 併發訊息監聽介面
    pushConsumer.registerMessageListener(concurrentlyListener);
    // 順序訊息監聽介面
    // pushConsumer.registerMessageListener(orderlyListener);
    // 設定訊息佇列分配策略
    pushConsumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
    pushConsumer.start();
 
    return pushConsumer;
}

消費

 Consumer消費訊息是通過自定義MessageListener實現的。

 1、併發消費,不保證實際的消費順序

public class ConcurrentlyListener implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        try {
            msgs.forEach(msg -> {
               // TODO 消費業務邏輯
            });
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}

 2、順序消費,同一佇列的訊息只有一個執行緒消費以保證順序,需要配合producer的MessageQueueSelector使用

public class OrderlyListener implements MessageListenerOrderly {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        try {
            msgs.forEach(msg -> {
                // TODO 消費業務邏輯
            });
            return ConsumeOrderlyStatus.SUCCESS;
        } catch (Exception e) {
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
}

2、PullConsumer

初始化與啟動

public DefaultMQPullConsumer createPullConsumer() throws Exception {
    DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer(pullConsumerGroup);
    pullConsumer.setNamesrvAddr(namesrvAddr);
    pullConsumer.setMessageModel(MessageModel.CLUSTERING);
    pullConsumer.start();
    return pullConsumer;
}

消費

private void consume() throws Exception {
     // 獲取topic下的全部佇列
     Set<MessageQueue> queues = pullConsumer.fetchSubscribeMessageQueues(topic);
     // 均衡獲取topic下的部分佇列
     // Set<MessageQueue> queues = pullConsumer.fetchMessageQueuesInBalance(topic);
 
     for (MessageQueue queue : queues) {
         SINGLE_MQ:
         for (; ; ) {
             PullResult pullResult = pullConsumer.pullBlockIfNotFound(queue, "", getOffset(queue) /* 獲取佇列的消費位移 */, 32);
             PullStatus pullStatus = pullResult.getPullStatus();
             // 本地記錄當前queue的消費位移
             setOffset(queue, pullResult.getNextBeginOffset());
 
             switch (pullStatus) {
                 case FOUND:
                     List<MessageExt> messageExts = pullResult.getMsgFoundList();
                     log.info("pull success, msg size: ", messageExts.size());
                     for (MessageExt messageExt : pullResult.getMsgFoundList()) {
                         // TODO 業務處理邏輯
                     }
                     break;
                 case NO_NEW_MSG:
                     break SINGLE_MQ;
                 case NO_MATCHED_MSG:
                 case OFFSET_ILLEGAL:
                 default:
                     break;
             }
         }
     }
 }

【參考資料】

 https://blog.csdn.net/yewandemty/article/details/81989695
 https://www.jianshu.com/p/75badea5ac1e