1. 程式人生 > 其它 >RocketMQ簡單使用(一)簡單訊息、順序訊息、延遲訊息

RocketMQ簡單使用(一)簡單訊息、順序訊息、延遲訊息

研究下其簡單使用。

1. 簡單訊息

這裡使用三種訊息的傳送方式: 同步傳送、非同步傳送、單向傳送,以及訊息的消費。

  1. 同步傳送
package com.zd.bx.rocketmq.simple;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * 同步傳送訊息: 可靠同步傳輸應用場景廣泛,如重要通知訊息、簡訊通知、簡訊營銷系統等
 */
public class SyncProducer {

    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer("syncProducer");
        // Specify name server addresses.
        producer.setNamesrvAddr("192.168.13.111:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("syncTopic" /* Topic */,
                    "TagA" /* Tag */,
                    "keys" + i, /* Keys */
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

結果:

SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224D30000, offsetMsgId=C0A80D6F00002A9F000000000010B9F4, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=0], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224E00001, offsetMsgId=C0A80D6F00002A9F000000000010BABD, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=1], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224E40002, offsetMsgId=C0A80D6F00002A9F000000000010BB86, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=2], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=7F0000017CA018B4AAC25BF224E80003, offsetMsgId=C0A80D6F00002A9F000000000010BC4F, messageQueue=MessageQueue [topic=syncTopic, brokerName=DEFAULT_BROKER, queueId=3], queueOffset=50]
...
  1. 非同步傳送訊息
package com.zd.bx.rocketmq.simple;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 非同步傳送訊息: 一般用於響應時間敏感的業務場景。
 */
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("asyncProducer");
        // Specify name server addresses.
        producer.setNamesrvAddr("192.168.13.111:9876");
        //Launch the instance.
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);

        int messageCount = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("asyncTopic",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}

結果:

0          OK 7F000001631C18B4AAC25BF3A67E0000 
6          OK 7F000001631C18B4AAC25BF3A67E0002 
3          OK 7F000001631C18B4AAC25BF3A67E0005 
1          OK 7F000001631C18B4AAC25BF3A67E0007 
4          OK 7F000001631C18B4AAC25BF3A67E0004 
7          OK 7F000001631C18B4AAC25BF3A67E0001 
...
  1. 單向傳送
package com.zd.bx.rocketmq.simple;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * 單向傳送訊息: 單向傳輸用於需要中等可靠性的情況,例如日誌收集
 */
public class OnewayProducer {

    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("oneWayProducer");
        // Specify name server addresses.
        producer.setNamesrvAddr("192.168.13.111:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("oneWayTopic" /* Topic */,
                    "TagB" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }
        //Wait for sending to complete
        Thread.sleep(5000);
        producer.shutdown();
    }
}
  1. 消費者
package com.zd.bx.rocketmq.simple;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 定義一個pull 消費者
//        DefaultLitePullConsumer consumer2 = new DefaultLitePullConsumer("myTestConsumerGroup");
        // 定義一個push消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myTestConsumerGroup");
        // 指定從第一條訊息開始消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 指定nameserver
        consumer.setNamesrvAddr("192.168.13.111:9876");
        // 指定消費的topic與tag
        consumer.subscribe("syncTopic", "*");
        // 指定使用 廣播模式進行消費,預設為叢集模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s body: %s %n", Thread.currentThread().getName(), msgs, new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

2. 順序訊息

1. 什麼是順序訊息

順序訊息是指,嚴格按照訊息的傳送順序進行消費的訊息。
預設情況下,生產者會把以 RoundRobin 輪詢方式傳送到不同的Queue 分割槽佇列; 而消費訊息時會從多個Queue 上拉取訊息,這種情況下的傳送和消費是不能保證順序的。 如果將訊息僅傳送到同一個Queue 中,消費時也就從這個Queue 上拉取訊息,就保證了訊息的順序性。
舉個例子:訂單狀態佇列(ORDER_STATUS), 其下有四個Queue 佇列。我們傳送一個訂單的狀態時:001未支付-》001已支付-》001發貨中-》001發貨成功; 這四個狀態必須嚴格按照順序進行消費,所以就引入了順序訊息。

2. 有序性分類

全域性有序\分割槽有序。 全域性有序是指該topic只有一個佇列;分割槽有序是指在有多個Queue 的情況下,在定義Producer時我們指定訊息佇列選擇器,將相關的訊息傳送到相同的佇列,來保證在同一個佇列。

3. 程式碼演示

全域性有序很簡單,只需要將生產者佇列數量設定為1即可。
分割槽有序可以在調send 的時候傳遞MessageQueueSelector 進行佇列的負載均衡, 負載均衡肯定會選擇一個key 作為路由的值:可以是msg的ke, 也可以是呼叫send 的時候傳遞第三個引數。

package com.zd.bx.rocketmq.order;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;

public class Producer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("orderProducer");
        producer.setNamesrvAddr("192.168.13.111:9876");
        // 設定queue 的數量為1, 則為全域性有序
//        producer.setDefaultTopicQueueNums(1);
        producer.start();
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 100; i++) {
            int orderId = i % 10;
            Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            msg.setKeys(i + "");
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 第一種方法是根據key 來進行路由
//                    Integer id = Integer.valueOf(msg.getKeys());
                    // 第二種就是傳遞引數來進行路由,send 方法的第三個引數會傳遞到arg引數上面
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);

            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

3. 延遲訊息

如果記得沒錯,rabbitMQ的延遲訊息需要藉助於訊息的生存時間和死信佇列實現延遲。

1. 延遲訊息的等級

延遲訊息的等級定義在服務端MessageStoreConfig.java 中,如下:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

比如,如果指定等級為3, 則延遲為10s, 延遲等級從1開始。如果想增加其他的配置,可以在rocketMQ安裝目錄的conf 目錄中進行配置。

2. 延遲訊息原理

如下圖:

具體實現方案是:
Producer 將訊息傳送到Broker 之後,Broker 先將訊息傳送到commitLog 檔案,然後將其分發到相應的consumerqueue。 不過,在分發之前,系統會判斷訊息中是否帶有延遲等級,沒有則直接正常分發;若有:
(1) 修改訊息的topic 為SCHEDULE_TOPIC_XXXX 目錄。
(2) 根據延時等級,在consumequeue 目錄中SCHEDULE_TOPIC_XXXX 主題下創建出相應的queueId 與consumequeue 檔案(如果沒有這些目錄與檔案)。這裡需要注意:延遲等級與queueId 的對應關係為 queueId = delayLevel -1
(3) 修改訊息索引單元內容。索引單元中的MessagetagHashCode 部分原本存放的是訊息的Tag的Hash 值。現在修改為訊息的投遞時間。投遞時間=訊息儲存時間+延遲時間, 也就是投遞時間存的是其真正需要被分發回原queue 的時間。
(4) 投遞延遲訊息:Broker 內部有一個延遲訊息服務類ScheduleMessageService,負責將訊息投遞到目標Topic(內部用Timer 定時器實現)
(5) 將訊息重新寫入commitlog,形成新的索引條目分發到相應的queue 中

3. 程式碼演示

package com.zd.bx.rocketmq.delay;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class Producer {

    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("delayProducer");
        producer.setNamesrvAddr("192.168.13.111:9876");
        // Launch producer
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("delayTopicTest", ("Hello delay message " + i).getBytes());
            // This message will be delivered to consumer 10 seconds later.
            message.setDelayTimeLevel(3);
            producer.send(message);
        }

        producer.shutdown();
    }

}

如果想測試效果,可以先啟動消費者,然後啟動生產者檢視其效果。
啟動後到伺服器rocketMQ 儲存目錄檢視主題目錄其佇列資訊如下:(預設在當前使用者的store 目錄)

[root@redisnode1 consumequeue]# pwd
/root/store/consumequeue
[root@redisnode1 consumequeue]# ls
asyncTopic  delayTopicTest  oneWayTopic  SCHEDULE_TOPIC_XXXX  syncTopic  TopicTest
[root@redisnode1 consumequeue]# ls -R ./delayTopicTest/
./delayTopicTest/:
0  1  2  3

./delayTopicTest/0:
00000000000000000000

./delayTopicTest/1:
00000000000000000000

./delayTopicTest/2:
00000000000000000000

./delayTopicTest/3:
00000000000000000000
[root@redisnode1 consumequeue]# ls -R ./SCHEDULE_TOPIC_XXXX/
./SCHEDULE_TOPIC_XXXX/:
2

./SCHEDULE_TOPIC_XXXX/2:
00000000000000000000

可以看出第一次發出延遲訊息後會多出一個SCHEDULE_TOPIC_XXXX目錄,內部會根據訊息的延遲等級建立相應的queue 目錄(queueId = delayLevel -1 , 所以queue 的目錄ID為2)。

參考: https://rocketmq.apache.org/docs/simple-example/