1. 程式人生 > >RocketMQ專題1:入門

RocketMQ專題1:入門

ability 倒數 base subscribe tle business term dep 1.8

RocketMQ入門

源碼和應用下載

? 這裏以RocketMQ的4.3.0版本為例,本地環境為windows10,jdk1.8, maven3.2.1.

源碼下載地址: http://mirrors.hust.edu.cn/apache/rocketmq/4.3.0/rocketmq-all-4.3.0-source-release.zip

應用下載地址: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.0/rocketmq-all-4.3.0-bin-release.zip

啟動

? Windows下需要配置環境變量,ROCKETMQ_HOME, 我這裏配置為: E:\software\rocketmq-all-4.3.0-bin-release

? 配置完環境變量後,就可以進入到bin目錄:

  • 啟動server: 直接運行bin目錄下的mqnamesrv.cmd

  • 啟動broker: 運行mqbroker.cmd,發現一閃而過,查看bin目錄下的bk.log日誌,發現錯誤日誌如下:

    錯誤: 找不到或無法加載主類 Files\Java\jdk1.8.0_121\lib;C:\Program

    再查看mqbroker.cmd源碼,發現其最終調用了runbroker.cmd。該腳本的倒數第二行為:

    set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%"

    知道問題所在: CLASSPATH的配置中是包含空格的,而空格導致最終解析出來的路徑錯誤。最終我修改倒數第二行為:

    set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""

    ?至此可以順利啟動

? 本以為啟動之後就能就行消息收發了,於是我按照官網示例進入RocketMQ的bin目錄,並通過命令向broker發送消息:

tools org.apache.rocketmq.example.quickstart.Producer

? 結果一直報錯,搜索得知在windows下需要配置環境變量NAMESRV_ADDR127.0.0.1:9876

? 配置完成之後,再依次啟動mqnamesrv和mqbroker,重新測試Producer發現Producer的輸出大致如下:

......
SendResult [sendStatus=SEND_OK, msgId=C0A8029D46D461BBE9BA5A115E9C03E5, offsetMsgId=C0A8130100002A9F000000000002BC96, messageQueue=MessageQueue [topic=TopicTest, brokerName=Ziyuqi-0431, queueId=0], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8029D46D461BBE9BA5A115E9E03E6, offsetMsgId=C0A8130100002A9F000000000002BD4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=Ziyuqi-0431, queueId=1], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8029D46D461BBE9BA5A115EA003E7, offsetMsgId=C0A8130100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=Ziyuqi-0431, queueId=2], queueOffset=249]
11:44:47.790 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.19.1:10911] result: true
11:44:47.791 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
11:44:47.793 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.19.1:10909] result: true

? 在通過命令行運行Consumer:

tools org.apache.rocketmq.example.quickstart.Consumer

? 發現Consumer的輸出為:

ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=249, sysFlag=0, bornTimestamp=1537242287776, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287778, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002BDFE, commitLogOffset=179710, bodyCRC=638172955, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409812, UNIQ_KEY=C0A8029D46D461BBE9BA5A115EA003E7, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 57], transactionId='null'}]]
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=248, sysFlag=0, bornTimestamp=1537242287768, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287768, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002BB2E, commitLogOffset=178990, bodyCRC=801108784, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409811, UNIQ_KEY=C0A8029D46D461BBE9BA5A115E9803E3, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 53], transactionId='null'}]]
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=247, sysFlag=0, bornTimestamp=1537242287761, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287761, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002B85E, commitLogOffset=178270, bodyCRC=684865321, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409807, UNIQ_KEY=C0A8029D46D461BBE9BA5A115E9103DF, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 49], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=246, sysFlag=0, bornTimestamp=1537242287753, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287753, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002B58E, commitLogOffset=177550, bodyCRC=1487577949, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409807, UNIQ_KEY=C0A8029D46D461BBE9BA5A115E8903DB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 55], transactionId='null'}]]

關閉

? 關閉的步驟與啟動正好相反

  • 關閉brokermqshutdown broker
  • 關閉namesrvmqshutdown namesrv

簡單示例

? 在進行簡單的示例之前,我們先要知道為什麽會出現RocketMQ,下面一段話摘自RocketMQ官網:

Based on our research, with increased queues and virtual topics in use, ActiveMQ IO module reaches a bottleneck. We tried our best to solve this problem through throttling, circuit breaker or degradation, but it did not work well. So we begin to focus on the popular messaging solution Kafka at that time. Unfortunately, Kafka can not meet our requirements especially in terms of low latency and high reliability, see here for details.

In this context, we decided to invent a new messaging engine to handle a broader set of use cases, ranging from traditional pub/sub scenarios to high volume real-time zero-loss tolerance transaction system. We believe this solution can be beneficial, so we would like to open source it to the community. Today, more than 100 companies are using the open source version of RocketMQ in their business. We also published a commercial distribution based on RocketMQ, a PaaS product called the Alibaba Cloud Platform.

? 可以知道RocketMQ是阿裏在使用ActiveMQ時,出現了IO瓶頸,無法滿足阿裏業務所需要的低延遲和高可靠性要求時自己研發出來。並且最終捐贈給Apache,成為頂級開源項目的。high volume real-time zero-loss tolerance transaction system是其核心特點。

? 下面通過一個簡單的示例,來說明RocketMQ的基本使用:

引入pom依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>

Producer

? Producer一般分為三種模式: 同步、異步和單向,具體代碼如下:

public class SimpleProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException,
            MQBrokerException, InterruptedException {
        /**
         * 同步消息發送: 一般用來進行通知、短信等重要消息的同步
         */
        // syncProducer();
        
        /**
         * 異步消息發送: 一般用來對方法調用響應時間有較嚴格要求的情況下,異步調用,立即返回
         * 不同於同步的唯一在於: send方法調用的時候多攜帶一個回調接口參數,用來異步處理消息發送結果
         */
        asyncProducer();
        
        /**
         * 單向模式: 一般用來對可靠性有一定要求的消息發送,例如日誌系統
         * 不同於同步的唯一之處在於: 調用的是sendOneway方法,且該方法不會給調用者任何返回值
         */
        // oneWayProducer();
    }

    private static void oneWayProducer() throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException {
        // STEP1: 創建Producer並且指定組名
        DefaultMQProducer oneWayProducer = new DefaultMQProducer("GroupA");

        // STEP2: 指定nameServer地址
        oneWayProducer.setNamesrvAddr("localhost:9876");

        // STEP3: 啟動Producer
        oneWayProducer.start();

        // STEP4: 循環發送消息
        for (int i = 0; i < 50; i++) {
            Message message = new Message("OneWayTopic", "TagA",
                    ("OneWayMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            oneWayProducer.sendOneway(message);
        }

        // STEP5: 關閉Producer
        oneWayProducer.shutdown();
    }

    private static void asyncProducer() throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException {
        // STEP1: 創建Producer並且指定組名
        DefaultMQProducer asyncProducer = new DefaultMQProducer("GroupA");

        // STEP2: 指定nameServer地址
        asyncProducer.setNamesrvAddr("localhost:9876");

        // STEP3: 啟動Producer
        asyncProducer.start();
        asyncProducer.setRetryTimesWhenSendAsyncFailed(0);      // 設置異步發送失敗重試次數,默認為2

        // STEP4: 循環發送消息
        for (int i = 0; i < 50; i++) {
            Message message = new Message("AsyncTopic", "TagA",
                    ("AsyncMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 創建回調函數處理發送成功或者異常
            asyncProducer.send(message, new SendCallback() {
                
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
                
                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                }
            });
        }

        // STEP5: 關閉Producer
        TimeUnit.SECONDS.sleep(10); // 睡眠10秒,確保消息都發送出去
        asyncProducer.shutdown();
    }

    private static void syncProducer() throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException {
        // STEP1: 創建Producer並且指定組名
        DefaultMQProducer syncProducer = new DefaultMQProducer("GroupA");

        // STEP2: 指定nameServer地址
        syncProducer.setNamesrvAddr("localhost:9876");

        // STEP3: 啟動Producer
        syncProducer.start();

        // STEP4: 循環發送消息
        for (int i = 0; i < 50; i++) {
            Message message = new Message("SyncTopic", "TagA",
                    ("SyncMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = syncProducer.send(message);
            System.out.println(sendResult);
        }
        
        // STEP5: 關閉Producer
        syncProducer.shutdown();
    }
}

Consumer

? consumer的實現就較為簡單了,定義一個事件監聽接口即可.

public class SimpleConsumer {
    public static void main(String[] args) throws MQClientException {
        // STEP1: 創建默認Consumer並指定
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupA");
        
        // STEP2: 指定nameServer地址
        consumer.setNamesrvAddr("localhost:9876");
        
        // STEP3: 訂閱對應主題和tag
        consumer.subscribe("AsyncTopic", "*");
        
        // STEP4: 註冊接收到broker消息後的處理接口
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    System.out.println(new String(msgs.get(0).getBody(), RemotingHelper.DEFAULT_CHARSET));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        
        // STEP5: 啟動consumer (必須在註冊完消息監聽器之後啟動,否則會報錯)
        consumer.start();
        
        System.out.println("Consumer started......");
    }
}

總結

  • 運行Producer的時候必須保證nameServer和broker都正常運行,否則會報org.apache.rocketmq.client.exception.MQClientException: No route info of this topic
  • 即使先運行Producer只要在運行Consumer之前,未重啟broker或者nameServer。Consumer啟動時還是能正常收到消息

參考鏈接

http://rocketmq.apache.org/docs/simple-example/

?

RocketMQ專題1:入門