RocketMQ筆記-收發訊息特性實踐
阿新 • • 發佈:2018-12-18
RocketMQ筆記-收發訊息特性實踐
1.批量傳送訊息
其send方法引數是一個Message的列表
public class BatchProducer { public static void main(String[] args) throws Exception{ //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("example_group_name"); //Launch the instance. producer.start(); List<Message> messages = new ArrayList<Message>(); messages.add(new Message("TopicTest", "TagA", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message("TopicTest", "TagA", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message("TopicTest", "TagA", "OrderID003", "Hello world 2".getBytes())); try { producer.send(messages); } catch (Exception e) { e.printStackTrace(); //handle the error } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
1.1 傳送訊息大小限制
訊息體最大限制為1M,如果列表的大小超過限制,則需要對列表進行分割處理
public class ListSplitter implements Iterator<List<Message>> { private final int SIZE_LIMIT = 1000 * 1000; private final List<Message> messages; private int currIndex; public ListSplitter(List<Message> messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List<Message> next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length; Map<String, String> properties = message.getProperties(); for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; //for log overhead if (tmpSize > SIZE_LIMIT) { //it is unexpected that single message exceeds the SIZE_LIMIT //here just let it go, otherwise it will block the splitting process if (nextIndex - currIndex == 0) { //if the next sublist has no element, add this one and then break, otherwise just break nextIndex++; } break; } if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } List<Message> subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } } //then you could split the large list into small ones: ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) { try { List<Message> listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { e.printStackTrace(); //handle the error } }
2.Tag
用於訊息過濾
- Producer在傳送訊息時,一個訊息只能有一個Tag
- Consumer可以訂閱多個Tag,用於接收過濾的訊息
//Producer for (int i = 0; i < 100; i++){ Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //consumer consumer.subscribe("TopicTest", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
3.Key
Key一般用訊息在業務層面的唯一標識碼,可以方便後續跟蹤查詢,儘量保證Key的唯一性
Message msg = new Message("TopicTest" ,
"TagA" ,
("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET)
/* Message body */
);
msg.setKeys("orderId");
4.設定Name Server地址
有2種方法設定Name Server地址
- 在環境變數中設定NAMESRV_ADDR
export NAMESRV_ADDR=localhost:9876
2.在程式碼中呼叫setNamesrvAddr方法進行設定,Producer和Consumer都需要設定
DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
producer.setNamesrvAddr("localhost:9876");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setNamesrvAddr("localhost:9876");
否則會拋異常
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: No name server address, please set it.
See http://rocketmq.apache.org/docs/faq/ for further details.
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:560)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1069)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1023)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:214)
5. 傳送延遲訊息
延遲訊息的使用方法是在建立Message物件時,呼叫setDelayTimeLevel(intlevel)方法設定延遲時間,然後再把這個訊息傳送出去。目前延遲的時間不支援任意設定,僅支援預設值的時間長度(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)。比如setDelayTimeLevel(3)表示延遲10s。
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
// Launch producer
producer.start();
int totalMessagesToSend = 10;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TopicTest", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}
6. Consumer執行緒數設定
消費者使用一個 ThreadPoolExecutor 來處理內部的消費,因此您可以通過設定setConsumeThreadMin
或setConsumeThreadMax
來更改它。
7.Consumer啟動消費起點
當建立一個新的 Consumer Group 時,需要決定是否需要消費 Broker 中已經存在的歷史訊息。
- CONSUME_FROM_FIRST_OFFSET 將消耗 Broker 中存在的所有訊息。您還可以使用
- CONSUME_FROM_LAST_OFFSET 將忽略歷史訊息,並消費此後生成的任何內容。
- CONSUME_FROM_TIMESTAMP 來消費在指定的時間戳之後生成的訊息
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
//consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(
// System.currentTimeMillis() - (1000 * 60 * 1)));
8.UserProperty
設定使用者自定義鍵值對資料
//Producer
Message msg = new Message("TopicTest" ,
"TagA" ,
("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET)
/* Message body */
);
msg.putUserProperty("test","testValue");
//Consumer
String val=msgs.get(0).getUserProperty("test");
System.out.printf(val);
9.批量消費
設定consumeMessageBatchMaxSize屬性,預設為1
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeMessageBatchMaxSize(10);