1. 程式人生 > 其它 >rocketmq廣播訊息的(五)

rocketmq廣播訊息的(五)

一、簡介

廣播消費指的是:一條訊息被多個consumer消費,即使這些consumer屬於同一個ConsumerGroup,訊息也會被ConsumerGroup中的每個Consumer都消費一次,廣播消費中ConsumerGroup概念可以認為在訊息劃分方面無意義。

二、程式碼

/**
 * 釋出訂閱訊息生產者
 */
public class BroadcastProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException {
        
// 1. 建立生產者物件 DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST"); // 2. 設定NameServer的地址,如果設定了環境變數NAMESRV_ADDR,可以省略此步 producer.setNamesrvAddr("192.168.32.128:9876"); // 3. 啟動生產者 producer.start(); // 4. 生產者傳送訊息 for (int i = 0; i < 10; i++) { Message message
= new Message("TopicTest", "TagA", "OrderID_" + i, ("Hello Broadcast:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.printf("傳送結果:%s%n", result); } // 5. 停止生產者 producer.shutdown(); } }
/**
 * 釋出訂閱訊息生產者
 
*/ public class BroadcastProducer { public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException { // 1. 建立生產者物件 DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST"); // 2. 設定NameServer的地址,如果設定了環境變數NAMESRV_ADDR,可以省略此步 producer.setNamesrvAddr("192.168.32.128:9876"); // 3. 啟動生產者 producer.start(); // 4. 生產者傳送訊息 for (int i = 0; i < 10; i++) { Message message = new Message("TopicTest", "TagA", "OrderID_" + i, ("Hello Broadcast:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.printf("傳送結果:%s%n", result); } // 5. 停止生產者 producer.shutdown(); } }
這短短的一生我們最終都會失去,不妨大膽一點,愛一個人,攀一座山,追一個夢