1. 程式人生 > >RocketMQ筆記-收發訊息特性實踐

RocketMQ筆記-收發訊息特性實踐

RocketMQ筆記-收發訊息特性實踐

1.批量傳送訊息

其send方法引數是一個Message的列表

public class BatchProducer {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
        //Launch the instance.
        producer.start();
        List<Message> messages = new ArrayList<Message>();
        messages.add(new Message("TopicTest", "TagA", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message("TopicTest", "TagA", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message("TopicTest", "TagA", "OrderID003", "Hello world 2".getBytes()));
        try {
            producer.send(messages);
        } catch (Exception e) {
            e.printStackTrace();
            //handle the error
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

1.1 傳送訊息大小限制

訊息體最大限制為1M,如果列表的大小超過限制,則需要對列表進行分割處理

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
            this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > SIZE_LIMIT) {
                //it is unexpected that single message exceeds the SIZE_LIMIT
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                   //if the next sublist has no element, add this one and then break, otherwise just break
                   nextIndex++;  
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
    
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
   try {
       List<Message>  listItem = splitter.next();
       producer.send(listItem);
   } catch (Exception e) {
       e.printStackTrace();
       //handle the error
   }
}

2.Tag

用於訊息過濾

  • Producer在傳送訊息時,一個訊息只能有一個Tag
  • Consumer可以訂閱多個Tag,用於接收過濾的訊息
       //Producer
        for (int i = 0; i < 100; i++){
            Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

       //consumer
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
 ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() +
 " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

3.Key

Key一般用訊息在業務層面的唯一標識碼,可以方便後續跟蹤查詢,儘量保證Key的唯一性

            Message msg = new Message("TopicTest" ,
                    "TagA" ,
                    ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET)
                    /* Message body */
            );
            msg.setKeys("orderId");

4.設定Name Server地址

有2種方法設定Name Server地址

  1. 在環境變數中設定NAMESRV_ADDR
    export NAMESRV_ADDR=localhost:9876
    2.在程式碼中呼叫setNamesrvAddr方法進行設定,Producer和Consumer都需要設定
        DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
        producer.setNamesrvAddr("localhost:9876");

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setNamesrvAddr("localhost:9876");

否則會拋異常

Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: No name server address, please set it.
See http://rocketmq.apache.org/docs/faq/ for further details.
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:560)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1069)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1023)
    at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:214)

5. 傳送延遲訊息

延遲訊息的使用方法是在建立Message物件時,呼叫setDelayTimeLevel(intlevel)方法設定延遲時間,然後再把這個訊息傳送出去。目前延遲的時間不支援任意設定,僅支援預設值的時間長度(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)。比如setDelayTimeLevel(3)表示延遲10s。

public class ScheduledMessageProducer {

    public static void main(String[] args) throws Exception {
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
        // Launch producer
        producer.start();
        int totalMessagesToSend = 10;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("TopicTest", ("Hello scheduled message " + i).getBytes());
            // This message will be delivered to consumer 10 seconds later.
            message.setDelayTimeLevel(3);
            // Send the message
            producer.send(message);
        }

        // Shutdown producer after use.
        producer.shutdown();
    }

}

6. Consumer執行緒數設定

消費者使用一個 ThreadPoolExecutor 來處理內部的消費,因此您可以通過設定setConsumeThreadMinsetConsumeThreadMax來更改它。

7.Consumer啟動消費起點

當建立一個新的 Consumer Group 時,需要決定是否需要消費 Broker 中已經存在的歷史訊息。

  • CONSUME_FROM_FIRST_OFFSET 將消耗 Broker 中存在的所有訊息。您還可以使用
  • CONSUME_FROM_LAST_OFFSET 將忽略歷史訊息,並消費此後生成的任何內容。
  • CONSUME_FROM_TIMESTAMP 來消費在指定的時間戳之後生成的訊息
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
        //consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(
        //        System.currentTimeMillis() - (1000 * 60 * 1)));

8.UserProperty

設定使用者自定義鍵值對資料

            //Producer
            Message msg = new Message("TopicTest" ,
                    "TagA" ,
                    ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET)
                    /* Message body */
            );
            msg.putUserProperty("test","testValue");
            //Consumer
           String val=msgs.get(0).getUserProperty("test");
           System.out.printf(val);

9.批量消費

設定consumeMessageBatchMaxSize屬性,預設為1

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeMessageBatchMaxSize(10);