1. 程式人生 > 其它 >Rocketmq-client 傳送簡單訊息、順序訊息、延遲訊息以及消費者簡單例子

Rocketmq-client 傳送簡單訊息、順序訊息、延遲訊息以及消費者簡單例子

1. 簡單訊息

這裡使用三種訊息的傳送方式: 同步傳送、非同步傳送、單向傳送,以及訊息的消費。

  1. 同步傳送
package com.zd.bx.rocketmq.simple;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/** * 同步傳送訊息: 可靠同步傳輸應用場景廣泛,如重要通知訊息、簡訊通知、簡訊營銷系統等 */ public class SyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("syncProducer"); // Specify name server addresses.
producer.setNamesrvAddr("192.168.13.111:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("syncTopic" /* Topic */,
"TagA" /* Tag */, "keys" + i, /* Keys */ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }

 結果:

SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224D30000, offsetMsgId=C0A80D6F00002A9F000000000010B9F4, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=0], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224E00001, offsetMsgId=C0A80D6F00002A9F000000000010BABD, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=1], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224E40002, offsetMsgId=C0A80D6F00002A9F000000000010BB86, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=2], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224E80003, offsetMsgId=C0A80D6F00002A9F000000000010BC4F, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=3], queueOffset=50]
...
  1. 非同步傳送訊息
package com.zd.bx.rocketmq.simple;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 非同步傳送訊息: 一般用於響應時間敏感的業務場景。
 */
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("asyncProducer");
        // Specify name server addresses.
        producer.setNamesrvAddr("192.168.13.111:9876");
        //Launch the instance.
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);

        int messageCount = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("asyncTopic",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}

 

 結果:

0          OK 7F000001631C18B4AAC25BF3A67E0000 
6          OK 7F000001631C18B4AAC25BF3A67E0002 
3          OK 7F000001631C18B4AAC25BF3A67E0005 
1          OK 7F000001631C18B4AAC25BF3A67E0007 
4          OK 7F000001631C18B4AAC25BF3A67E0004 
7          OK 7F000001631C18B4AAC25BF3A67E0001 
...
  1. 單向傳送
package com.zd.bx.rocketmq.simple;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * 單向傳送訊息: 單向傳輸用於需要中等可靠性的情況,例如日誌收集
 */
public class OnewayProducer {

    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("oneWayProducer");
        // Specify name server addresses.
        producer.setNamesrvAddr("192.168.13.111:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("oneWayTopic" /* Topic */,
                    "TagB" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }
        //Wait for sending to complete
        Thread.sleep(5000);
        producer.shutdown();
    }
}

 

  1. 消費者

(1)推送模式的訊息消費者

package com.zd.bx.rocketmq.simple;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 定義一個push消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myTestConsumerGroup");
        // 指定從第一條訊息開始消費
//        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 指定nameserver
        consumer.setNamesrvAddr("192.168.13.111:9876");
        // 指定消費的topic與tag
        consumer.subscribe("syncTopic", "*");
        // 指定使用 廣播模式進行消費,預設為叢集模式
//        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s body: %s %n", Thread.currentThread().getName(), msgs, new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

(2)拉取模式的消費者程式碼

package com.zd.bx.rocketmq.simple;

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class PullConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 1、建立DefaultLitePullConsumer物件
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumer");
        // 2、設定namesrv地址
        litePullConsumer.setNamesrvAddr("192.168.13.111:9876");
        // 3、訂閱消費主題
        litePullConsumer.subscribe("syncTopic", "*");
        // 4、啟動消費物件
        litePullConsumer.start();
        // 可以修改預設的阻塞時間
//        litePullConsumer.setPollTimeoutMillis(0);
        try {
            // 5、迴圈開始消費訊息
            while (true) {
                // 拉取訊息,無訊息時會阻塞 (預設會阻塞5s, 沒有訊息則返回一個空集合)
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s messageExts.size(): %s %n",System.currentTimeMillis(), messageExts.size());
                messageExts.stream().forEach(msg -> {
                    // 業務邏輯
                    System.out.printf("%s Receive New Messages, body: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                });
                // 同步消費位置。不執行該方法,應用重啟會存在重複消費。
                if (messageExts.size() > 0) {
                    litePullConsumer.commitSync();
                }
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
}

 

(3)關於訊息消費模式廣播模式與叢集模式的區別
  廣播消費模式下:相同ConsumerGroup 的每個Consumer例項都接收同一個Topic 的全量訊息,即每條訊息都會被髮送到ConsumerGroup 中的每個每個Consumer。消費進度儲存在Consumer端,因為廣播模式每個Consumer 都會消費,但是其進度不同。
  叢集消費模式下(預設,又被稱為負載均衡模式),相同ConsumerGroup 的每個Consumer 例項平攤同一個Topic 的訊息,即訊息只會被髮送到ConsumerGroup的某個Consumer。消費進度儲存在broker 中。因為同一條訊息只被消費一次,所以需要服務端broker 進行記錄下次進行合理的投遞。
  簡單理解。對於多例項的Consumer(同時啟動多個相同的Consumer),叢集模式會平攤Topic的訊息;廣播模式,每個例項都會消費所有的訊息的訊息,出現重複消費。 由於上述的特性,消費進度儲存也略有不同,叢集模式下消費進度儲存在broker中;廣播模式儲存在consumer例項中。服務端可以檢視叢集模式的消費進度:

 
[root@redisnode1 config]# pwd
/root/store/config
[root@redisnode1 config]# cat consumerOffset.json
{
        "offsetTable":{
                "syncTopic@myTestConsumerGroup":{0:124,1:128,2:124,3:124
                },
                "%RETRY%myTestConsumerGroup@myTestConsumerGroup":{0:0
                },
                "syncTopic@LitePullConsumer":{0:115,1:118,2:117,3:115
                }
        }
}

 

2. 順序訊息

1. 什麼是順序訊息

順序訊息是指,嚴格按照訊息的傳送順序進行消費的訊息。
預設情況下,生產者會把以 RoundRobin 輪詢方式傳送到不同的Queue 分割槽佇列; 而消費訊息時會從多個Queue 上拉取訊息,這種情況下的傳送和消費是不能保證順序的。 如果將訊息僅傳送到同一個Queue 中,消費時也就從這個Queue 上拉取訊息,就保證了訊息的順序性。
舉個例子:訂單狀態佇列(ORDER_STATUS), 其下有四個Queue 佇列。我們傳送一個訂單的狀態時:001未支付-》001已支付-》001發貨中-》001發貨成功; 這四個狀態必須嚴格按照順序進行消費,所以就引入了順序訊息。

2. 有序性分類

全域性有序\分割槽有序。 全域性有序是指該topic只有一個佇列;分割槽有序是指在有多個Queue 的情況下,在定義Producer時我們指定訊息佇列選擇器,將相關的訊息傳送到相同的佇列,來保證在同一個佇列。

3. 程式碼演示

全域性有序很簡單,只需要將生產者佇列數量設定為1即可。
分割槽有序可以在調send 的時候傳遞MessageQueueSelector 進行佇列的負載均衡, 負載均衡肯定會選擇一個key 作為路由的值:可以是msg的key, 也可以是呼叫send 的時候傳遞第三個引數。

 
package com.zd.bx.rocketmq.order;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;

public class Producer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("orderProducer");
        producer.setNamesrvAddr("192.168.13.111:9876");
        // 設定queue 的數量為1, 則為全域性有序
//        producer.setDefaultTopicQueueNums(1);
        producer.start();
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 100; i++) {
            int orderId = i % 10;
            Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            msg.setKeys(i + "");
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 第一種方法是根據key 來進行路由
//                    Integer id = Integer.valueOf(msg.getKeys());
                    // 第二種就是傳遞引數來進行路由,send 方法的第三個引數會傳遞到arg引數上面
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);

            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

 

3. 延遲訊息

如果記得沒錯,rabbitMQ的延遲訊息需要藉助於訊息的生存時間和死信佇列實現延遲。

1. 延遲訊息的等級

延遲訊息的等級定義在服務端MessageStoreConfig.java 中,如下:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

比如,如果指定等級為3, 則延遲為10s, 延遲等級從1開始。如果想增加其他的配置,可以在rocketMQ安裝目錄的conf 目錄中進行配置。

2. 延遲訊息原理

如下圖:

 

 具體實現方案是:
Producer 將訊息傳送到Broker 之後,Broker 先將訊息傳送到commitLog 檔案,然後將其分發到相應的consumerqueue。 不過,在分發之前,系統會判斷訊息中是否帶有延遲等級,沒有則直接正常分發;若有:
(1) 修改訊息的topic 為SCHEDULE_TOPIC_XXXX 目錄。
(2) 根據延時等級,在consumequeue 目錄中SCHEDULE_TOPIC_XXXX 主題下創建出相應的queueId 與consumequeue 檔案(如果沒有這些目錄與檔案)。這裡需要注意:延遲等級與queueId 的對應關係為 queueId = delayLevel -1
(3) 修改訊息索引單元內容。索引單元中的MessagetagHashCode 部分原本存放的是訊息的Tag的Hash 值。現在修改為訊息的投遞時間。投遞時間=訊息儲存時間+延遲時間, 也就是投遞時間存的是其真正需要被分發回原queue 的時間。
(4) 投遞延遲訊息:Broker 內部有一個延遲訊息服務類ScheduleMessageService,負責將訊息投遞到目標Topic(內部用Timer 定時器實現)
(5) 將訊息重新寫入commitlog,形成新的索引條目分發到相應的queue 中

3. 程式碼演示

package com.zd.bx.rocketmq.delay;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class Producer {

    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("delayProducer");
        producer.setNamesrvAddr("192.168.13.111:9876");
        // Launch producer
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("delayTopicTest", ("Hello delay message " + i).getBytes());
            // This message will be delivered to consumer 10 seconds later.
            message.setDelayTimeLevel(3);
            producer.send(message);
        }

        producer.shutdown();
    }

}

如果想測試效果,可以先啟動消費者,然後啟動生產者檢視其效果。
啟動後到伺服器rocketMQ 儲存目錄檢視主題目錄其佇列資訊如下:(預設在當前使用者的store 目錄)

[root@redisnode1 consumequeue]# pwd
/root/store/consumequeue
[root@redisnode1 consumequeue]# ls
asyncTopic  delayTopicTest  oneWayTopic  SCHEDULE_TOPIC_XXXX  syncTopic  TopicTest
[root@redisnode1 consumequeue]# ls -R ./delayTopicTest/
./delayTopicTest/:
0  1  2  3

./delayTopicTest/0:
00000000000000000000

./delayTopicTest/1:
00000000000000000000

./delayTopicTest/2:
00000000000000000000

./delayTopicTest/3:
00000000000000000000
[root@redisnode1 consumequeue]# ls -R ./SCHEDULE_TOPIC_XXXX/
./SCHEDULE_TOPIC_XXXX/:
2

./SCHEDULE_TOPIC_XXXX/2:
00000000000000000000

可以看出第一次發出延遲訊息後會多出一個SCHEDULE_TOPIC_XXXX目錄,內部會根據訊息的延遲等級建立相應的queue 目錄(queueId = delayLevel -1 , 所以queue 的目錄ID為2)。

 

 

轉載:https://www.cnblogs.com/qlqwjy/p/16023818.html

 springboot整合,使用rocketmq-spring傳送訊息可以參考github官網:https://github.com/apache/rocketmq-spring,檢視如下包中的例子即可學會使用生產者和消費者:

 

 rocketmq-spring 使用指南:https://github.com/apache/rocketmq-spring/wiki

 

 

    TRANSLATE with x English
Arabic Hebrew Polish
Bulgarian Hindi Portuguese
Catalan Hmong Daw Romanian
Chinese Simplified Hungarian Russian
Chinese Traditional Indonesian Slovak
Czech Italian Slovenian
Danish Japanese Spanish
Dutch Klingon Swedish
English Korean Thai
Estonian Latvian Turkish
Finnish Lithuanian Ukrainian
French Malay Urdu
German Maltese Vietnamese
Greek Norwegian Welsh
Haitian Creole Persian  
  TRANSLATE with COPY THE URL BELOW Back EMBED THE SNIPPET BELOW IN YOUR SITE Enable collaborative features and customize widget: Bing Webmaster Portal Back     此頁面的語言為英語   翻譯為        
  • 中文(簡體)
  • 中文(繁體)
  • 丹麥語
  • 烏克蘭語
  • 烏爾都語
  • 亞美尼亞語
  • 俄語
  • 保加利亞語
  • 克羅埃西亞語
  • 冰島語
  • 加泰羅尼亞語
  • 匈牙利語
  • 卡納達語
  • 印地語
  • 印尼語
  • 古吉拉特語
  • 哈薩克語
  • 土耳其語
  • 威爾士語
  • 孟加拉語
  • 尼泊爾語
  • 布林語(南非荷蘭語)
  • 希伯來語
  • 希臘語
  • 庫爾德語
  • 德語
  • 義大利語
  • 拉脫維亞語
  • 挪威語
  • 捷克語
  • 斯洛伐克語
  • 斯洛維尼亞語
  • 旁遮普語
  • 日語
  • 普什圖語
  • 毛利語
  • 法語
  • 波蘭語
  • 波斯語
  • 泰盧固語
  • 泰米爾語
  • 泰語
  • 海地克里奧爾語
  • 愛沙尼亞語
  • 瑞典語
  • 立陶宛語
  • 緬甸語
  • 羅馬尼亞語
  • 寮國語
  • 芬蘭語
  • 英語
  • 荷蘭語
  • 薩摩亞語
  • 葡萄牙語
  • 西班牙語
  • 越南語
  • 亞塞拜然語
  • 阿姆哈拉語
  • 阿爾巴尼亞語
  • 阿拉伯語
  • 韓語
  • 馬爾加什語
  • 馬拉地語
  • 馬拉雅拉姆語
  • 馬來語
  • 馬耳他語
  • 高棉語