rocketMq的訊息的生產和消費
阿新 • • 發佈:2019-01-28
生產者: int orderId = 1000;//訂單號 DefaultMQProducer orderProducer = new DefaultMQProducer("orderProducerGroup"); orderProducer.setNamesrvAddr("ip1:port;ip2:port"); orderProducer.start(); for(int i = 0; i < 10; i ++){ Message msg = new Message("topic", "tags", "keys" + i, "body".getBytes(RemotingHelper.DEFAULT_CHARSET)); orderProducer.send(msg, new MessageQueueSelector(){ @Override public MessageQueue select(List mqs, Message msg, Object arg){ Integer id = (Integer)arg; int index = id%mqs.size(); return mqs.get(index); } }, orderId); } orderProducer.shutdown(); 消費者: DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("pushConsumerGroup"); pushConsumer.setNamesrvAddr("ip1:port;ip2:port"); pushConsumer.subscribe("topic", "*");//訂閱主題 pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); pushCosumer.setMessageModel(MessageModel.CLUSTERING);//叢集消費、預設也是“叢集消費” pushConsumer.registerMessageListener(new MessageListenerOrderly(){ @Override public ConsumeOrderStatus consumeMessage(List msgs, ConsumeOrderlyContext context){ for(int i = 0; i < msgs.size(); i ++){ Message msg = msgs.get(i); System.err.println("topic:" + msg.getTopic() + ",keys:" + msg.getKeys()); } return ConsumeOrderStatus.SUCCESS; } }); pushConsumer.start();
3、批量訊息
示例程式碼:
DefaultMQProducer batchProducer = new DefaultMQProducer("bathProducerGroup"); batchProducer.setNamesrvAddr("ip1:port;ip2:port"); batchProducer.start(); List msgs = new ArrayList(); msgs.add(msg1); msgs.add(msg2); .......... batchProducer.send(msgs); batchProducer.shutdown();
4、延遲或定時訊息
說明:rocketMq目前只支援固定精度的定時訊息, 官方說法如下: 如果要支援任意的時間精度,在 Broker 層面,必須要做訊息排序,如果再涉及到持久化,那麼訊息排序要不可避免的產生巨大效能開銷。
延遲級別:
延遲級別 | 時間 |
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
示例程式碼:
生產者: DefaultMQProducer delayProducer = new DefaultMQProducer("delayProducerGroup"); delayProducer.setNamesrvAddr("ip1:port;ip2:port"); delayProducer.start(); Message delayMsg = new Message("topic", "tags" , "keys", "binary content"); delayMsg.setDelayTimeLevel(3);//延遲10s被消費 delayProducer.send(delayMsg); delayProducer.shutdown(); 消費者: DefaultMQConsumer consume = new DefaultMQConsumer("consumeGroup"); consume.setNamesrvAddr("ip1:port;ip2:port"); consume.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consume.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context){ for(int i = 0; i < msgs.size(); i ++){ Message msg = msgs.get(i); System.out.println("topic:" + msg.getTopic() + "keys:" + msg.getKeys()); } return ConsumeConcurrentlyContext.CONSUME_SUCCESS; } });
5、過濾訊息
說明:rocketMq提供了簡單的訊息過濾功能,僅支援一些簡單的語法,