1. 程式人生 > >RocketMQ(2)

RocketMQ(2)

增加 集群 HERE ++ 生產 || ext tom print

1. 消費端集群消費(負載均衡)

 示例代碼:

技術分享圖片
/**
 * Producer,發送消息
 * 
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("message_producer");
        producer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
        producer.start();

        
for (int i = 0; i < 100; i++) { try { Message msg = new Message("TopicTest",// topic "Tag1",// tag ("Hello RocketMQ " + i).getBytes()// body ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); }
catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } } /** * Consumer,訂閱消息 */ public class Consumer1 { public Consumer1() { try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer"); consumer.setNamesrvAddr(
"192.168.32.135:9876;192.168.32.136:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3"); consumer.registerMessageListener(new Listener()); consumer.start(); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } class Listener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags); System.out.println("======暫停====="); Thread.sleep(60000); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } public static void main(String[] args) throws InterruptedException, MQClientException { Consumer1 consumer1 = new Consumer1(); System.out.println("Consumer1 Started."); } } /** * Consumer,訂閱消息 */ public class Consumer2 { public Consumer2() { try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer"); consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876"); consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3"); consumer.registerMessageListener(new Listener()); consumer.start(); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } class Listener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } public static void main(String[] args) throws InterruptedException, MQClientException { Consumer2 consumer2 = new Consumer2(); System.out.println("Consumer2 Started."); } }
View Code

一個生產者,兩個消費者,註意兩個消費者的組名要一樣。

先啟動兩個消費者(customer1,customer2),通過控制臺查看如下:

技術分享圖片

再啟動生產者生成100條消息,消費情況如下:

技術分享圖片

技術分享圖片

生成的100條消息被customer1和customer2平均的消費了。可以通過consumer.setAllocateMessageQueueStrategy去設置分配策略。

BTW:這是默認的模式,可以通過consumer.setMessageModel設置,MessageModel.CLUSTERING | MessageModel.BROADCASTING,如果是廣播消費,則每個客戶端都會收到生產端的所有消息

2.消息未響應會重發

代碼示例:

技術分享圖片
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("message_producer");
        producer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
        producer.start();

        for (int i = 0; i < 1; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                    "Tag1",// tag
                    ("Hello RocketMQ " + i).getBytes()// body
                        );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}


public class Consumer1 {

    public Consumer1() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
            consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    class Listener implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                    
                    System.out.println("======暫停=====");
                    Thread.sleep(600000);
                }
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }

    public static void main(String[] args) throws InterruptedException, MQClientException {
        Consumer1 consumer1 = new Consumer1();
        System.out.println("Consumer1 Started.");
    }
}


public class Consumer2 {

    public Consumer2() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
            consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    class Listener implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }

    public static void main(String[] args) throws InterruptedException, MQClientException {
        Consumer2 consumer2 = new Consumer2();
        System.out.println("Consumer2 Started.");
    }
}
View Code

先啟動consumer1,再啟動consumer2,最後啟動producer

技術分享圖片技術分享圖片

consumer1收到了消息,consumer2沒有收到消息,這時把consumer1強制停止,也就是說consumer1不會給MQ返回響應,查看結果:

技術分享圖片

consumer2也收到消息了,說明在MQ沒收到消費端響應的情況下,會重發消息。

3. 修改topic的隊列數

默認的隊列數是4個,可以從執行結果中看出:queueId都是0-3

技術分享圖片

細節可以看https://www.cnblogs.com/dyfh/p/4113677.html

可以增加設置producer.createTopic("TopicTest", "TopicTest", 8);

技術分享圖片

RocketMQ(2)