1. 程式人生 > >rocketMq的訊息的生產和消費

rocketMq的訊息的生產和消費

    
生產者:
int orderId = 1000;//訂單號

DefaultMQProducer orderProducer = new DefaultMQProducer("orderProducerGroup");
orderProducer.setNamesrvAddr("ip1:port;ip2:port");
orderProducer.start();
for(int i = 0; i < 10; i ++){
    Message msg = new Message("topic", "tags", "keys" + i, "body".getBytes(RemotingHelper.DEFAULT_CHARSET));
    orderProducer.send(msg, new MessageQueueSelector(){
    @Override
    public MessageQueue select(List mqs, Message msg, Object arg){
        Integer id = (Integer)arg;
        int index = id%mqs.size();
        return mqs.get(index);
    }
}, orderId);
}
orderProducer.shutdown();

消費者:

DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("pushConsumerGroup");
pushConsumer.setNamesrvAddr("ip1:port;ip2:port");
pushConsumer.subscribe("topic", "*");//訂閱主題
pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
pushCosumer.setMessageModel(MessageModel.CLUSTERING);//叢集消費、預設也是“叢集消費”
pushConsumer.registerMessageListener(new MessageListenerOrderly(){
    @Override
    public ConsumeOrderStatus consumeMessage(List msgs, ConsumeOrderlyContext context){
        for(int i = 0; i < msgs.size(); i ++){
            Message msg = msgs.get(i);
            System.err.println("topic:" + msg.getTopic() + ",keys:" + msg.getKeys());
        }
        return ConsumeOrderStatus.SUCCESS;
    }
});
pushConsumer.start();





3、批量訊息
      示例程式碼:
     
DefaultMQProducer batchProducer = new DefaultMQProducer("bathProducerGroup");
batchProducer.setNamesrvAddr("ip1:port;ip2:port");
batchProducer.start();
List msgs = new ArrayList();
msgs.add(msg1);
msgs.add(msg2);
..........
batchProducer.send(msgs);
batchProducer.shutdown();

4、延遲或定時訊息
      說明:rocketMq目前只支援固定精度的定時訊息, 官方說法如下: 如果要支援任意的時間精度,在 Broker 層面,必須要做訊息排序,如果再涉及到持久化,那麼訊息排序要不可避免的產生巨大效能開銷。

延遲級別:
延遲級別 時間
1 1s
2 5s
3 10s
4 30s
5 1m
6 2m
7 3m
8 4m
9 5m
10 6m
11 7m
12 8m
13 9m
14 10m
15 20m
16 30m
17 1h
18 2h

示例程式碼:
生產者:

DefaultMQProducer delayProducer = new DefaultMQProducer("delayProducerGroup");
delayProducer.setNamesrvAddr("ip1:port;ip2:port");
delayProducer.start();

Message delayMsg = new Message("topic", "tags" , "keys", "binary content");
delayMsg.setDelayTimeLevel(3);//延遲10s被消費
delayProducer.send(delayMsg);

delayProducer.shutdown();



消費者:

DefaultMQConsumer consume = new DefaultMQConsumer("consumeGroup");
consume.setNamesrvAddr("ip1:port;ip2:port");
consume.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consume.registerMessageListener(new MessageListenerConcurrently(){
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context){
        for(int i = 0; i < msgs.size(); i ++){
            Message msg = msgs.get(i);
            System.out.println("topic:" + msg.getTopic() + "keys:" + msg.getKeys());
        }
        return ConsumeConcurrentlyContext.CONSUME_SUCCESS;
    }
});


5、過濾訊息
     說明:rocketMq提供了簡單的訊息過濾功能,僅支援一些簡單的語法,