1. 程式人生 > >RocketMQ原理學習--訊息型別

RocketMQ原理學習--訊息型別

一、叢集訊息與廣播訊息

叢集消費:當使用叢集消費模式時,MQ 認為任意一條訊息只需要被叢集內的任意一個消費者處理即可。
廣播消費:當使用廣播消費模式時,MQ 會將每條訊息推送給叢集內所有註冊過的客戶端,保證訊息至少被每臺機器消費一次。

叢集消費模式:

適用場景&注意事項

  • 消費端叢集化部署,每條訊息只需要被處理一次。
  • 由於消費進度在服務端維護,可靠性更高。
  • 叢集消費模式下,每一條訊息都只會被分發到一臺機器上處理,如果需要被叢集下的每一臺機器都處理,請使用廣播模式。
  • 叢集消費模式下,不保證訊息的每一次失敗重投等邏輯都能路由到同一臺機器上,因此處理訊息時不應該做任何確定性假設。

廣播消費模式:

適用場景&注意事項

  • 順序訊息暫不支援廣播消費模式。
  • 每條訊息都需要被相同邏輯的多臺機器處理。
  • 消費進度在客戶端維護,出現重複的概率稍大於叢集模式。
  • 廣播模式下,MQ 保證每條訊息至少被每臺客戶端消費一次,但是並不會對消費失敗的訊息進行失敗重投,因此業務方需要關注消費失敗的情況。
  • 廣播模式下,第一次啟動時預設從最新訊息消費,客戶端的消費進度是被持久化在客戶端本地的隱藏檔案中,因此不建議刪除該隱藏檔案,否則會丟失部分訊息。
  • 廣播模式下,每條訊息都會被大量的客戶端重複處理,因此推薦儘可能使用叢集模式。
  • 廣播模式下服務端不維護消費進度,所以 MQ 控制檯不支援訊息堆積查詢和堆積報警功能。

程式碼示例:

設定叢集訊息:consumer.setMessageModel(MessageModel.CLUSTERING);

設定廣播訊息:consumer.setMessageModel(MessageModel.BROADCASTING);

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
        consumer.setNamesrvAddr("localhost:9876");

        //叢集消費者
        //consumer.setMessageModel(MessageModel.CLUSTERING);
        //廣播消費者
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.subscribe("TopicA-test", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

二、普通訊息、事物訊息、順序訊息、延時訊息

RocketMQ 針對不同的業務場景還提供了普通訊息、事物訊息、順序訊息和延時訊息等幾種訊息型別。

1、普通訊息

普通訊息也叫做無序訊息,簡單來說就是沒有順序的訊息,producer 只管傳送訊息,consumer 只管接收訊息,至於訊息和訊息之間的順序並沒有保證,可能先發送的訊息先消費,也可能先發送的訊息後消費。

舉個簡單例子,producer 依次傳送 order id 為 1、2、3 的訊息到 broker,consumer 接到的訊息順序有可能是 1、2、3,也有可能是 2、1、3 等情況,這就是普通訊息。

因為不需要保證訊息的順序,所以訊息可以大規模併發地傳送和消費,吞吐量很高,適合大部分場景。

示例:

生產者

public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        try {
            for (int i = 0; i < 3; i++) {
                Message msg = new Message("TopicA-test",// topic
                        "TagA",// tag
                        (new Date() + "Hello RocketMQ ,QuickStart 11" + i)
                                .getBytes()// body
                );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
       producer.shutdown();
    }

}

消費者

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
        consumer.setNamesrvAddr("localhost:9876");
        //consumer.setInstanceName("rmq-instance2");
        consumer.subscribe("TopicA-test", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

2、事物訊息

MQ 的事務訊息互動流程如下圖所示:

MQ 事務訊息互動流程

採用2PC提交:

第一階段是:步驟1,2,3。
第二階段是:步驟4,5。

生產者:

public class TransactionProducer {

    public static void main(String [] args) throws Exception{

        final TransactionMQProducer producer = new TransactionMQProducer("rmq-transaction");
        producer.setNamesrvAddr("localhost:9876");

        //事務回查最小併發數

        producer.setCheckThreadPoolMinSize(5);

        //事務回查最大併發數

        producer.setCheckThreadPoolMaxSize(20);

        //佇列數

        producer.setCheckRequestHoldMax(2000);

        producer.start();

        //伺服器回撥producer,檢查本地事務分支成功還是失敗
        producer.setTransactionCheckListener(new TransactionCheckListener() {

            @Override
            public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {

                System.out.println("state --" + new String(messageExt.getBody()));

                return LocalTransactionState.COMMIT_MESSAGE;

            }

        });

        TransactionExecuterImpl transactionExecuter = new TransactionExecuterImpl();

        for (int i = 0; i < 2; i++) {

            Message msg = new Message("TopicTransaction",

                    "Transaction" + i,

                    ("Hello RocketMq" + i).getBytes()

            );

            SendResult sendResult = producer.sendMessageInTransaction(msg, transactionExecuter, "tq");

            System.out.println(sendResult);

            TimeUnit.MICROSECONDS.sleep(1000);

        }

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

            @Override
            public void run() {

                producer.shutdown();

            }

        }));

        System.exit(0);

    }
}

執行本地事物:

public class TransactionExecuterImpl implements LocalTransactionExecuter {

    @Override
    public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {

        System.out.println("msg=" + new String(msg.getBody()));

        System.out.println("arg = "+arg);

        String tag = msg.getTags();

        if (tag.equals("Transaction1")){

            //這裡有一個分階段提交的概念

            System.out.println("這裡是處理業務邏輯,失敗情況下進行ROLLBACK");

            return LocalTransactionState.ROLLBACK_MESSAGE;

        }

        return LocalTransactionState.COMMIT_MESSAGE;

        //return LocalTransactionState.UNKNOW;

    }

}

消費者:

public class TransactionConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-transaction");
        consumer.setNamesrvAddr("localhost:9876");
        //consumer.setInstanceName("rmq-instance2");
        consumer.subscribe("TopicTransaction", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

3、順序訊息

有序訊息就是按照一定的先後順序的訊息型別。

舉個例子來說,producer 依次傳送 order id 為 1、2、3 的訊息到 broker,consumer 接到的訊息順序也就是 1、2、3 ,而不會出現普通訊息那樣的 2、1、3 等情況。

那麼有序訊息是如何保證的呢?我們都知道訊息首先由 producer 到 broker,再從 broker 到 consumer,分這兩步走。那麼要保證訊息的有序,勢必這兩步都是要保證有序的,即要保證訊息是按有序傳送到 broker,broker 也是有序將訊息投遞給 consumer,兩個條件必須同時滿足,缺一不可。
進一步還可以將有序訊息分成

  • 全域性有序訊息
  • 區域性有序訊息

實現原理:由於生產者預設是輪詢獲取MessageQueue佇列(每個Topic預設初始化4個MessageQueue),然後將訊息輪詢傳送到不同的MessageQueue中,訊息者從MessageQueue中獲取資料時很可能是無序的。

區域性有序訊息:將相同順序的訊息傳送到同一個MessageQueue佇列,這樣消費者從佇列中獲取資料肯定是相對有序的。

全域性有序訊息:將所有的訊息傳送到一個MessageQueue佇列,消費者從單個佇列中拉取訊息,訊息有序。

生產者:實現MessageQueueSelector介面,相同順序的訊息獲取同一個MessageQueue

public class OrderProducer {

    public static void main(String[] args) throws Exception {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("order_Producer");
            producer.setNamesrvAddr("localhost:9876");

            producer.start();

            for (int i = 1; i <= 5; i++) {

                Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());

                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, 0);

                System.out.println(sendResult);
            }
            for (int i = 1; i <= 5; i++) {

                Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes());

                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, 1);

                System.out.println(sendResult);
            }
            for (int i = 1; i <= 5; i++) {

                Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes());

                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, 2);

                System.out.println(sendResult);
            }

            producer.shutdown();
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

消費者:設定訊息監聽器為順序訊息監聽器MessageListenerOrderly

public class OrderConsumer {


    public static void main(String[] args) throws MQClientException {


        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
        consumer.setNamesrvAddr("localhost:9876");

        /**
         * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費<br>
         * 如果非第一次啟動,那麼按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicOrderTest", "*");

        /**
         * 實現了MessageListenerOrderly表示一個佇列只會被一個執行緒取到
         *,第二個執行緒無法訪問這個佇列
         */
        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);

            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                // 設定自動提交
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    System.out.println(msg + ",內容:" + new String(msg.getBody()));
                }

                try {
                    TimeUnit.SECONDS.sleep(5L);
                } catch (InterruptedException e) {

                    e.printStackTrace();
                }
                ;

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer1 Started.");
    }


}

4、延時訊息

延時訊息,簡單來說就是當 producer 將訊息傳送到 broker 後,會延時一定時間後才投遞給 consumer 進行消費。

RcoketMQ的延時等級為:1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。level=0,表示不延時。level=1,表示 1 級延時,對應延時 1s。level=2 表示 2 級延時,對應5s,以此類推。

這種訊息一般適用於訊息生產和消費之間有時間視窗要求的場景。比如說我們網購時,下單之後是有一個支付時間,超過這個時間未支付,系統就應該自動關閉該筆訂單。那麼在訂單建立的時候就會就需要傳送一條延時訊息(延時15分鐘)後投遞給 consumer,consumer 接收訊息後再對訂單的支付狀態進行判斷是否關閉訂單。

設定延時非常簡單,只需要在Message設定對應的延時級別即可

生產者:

public class DelayProducer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        try {
            for (int i = 0; i < 3; i++) {
                Message msg = new Message("TopicA-test",// topic
                        "TagA",// tag
                        (new Date() + "Hello RocketMQ ,QuickStart 11" + i)
                                .getBytes()// body
                );
                //1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。
                // level=0,表示不延時。level=1,表示 1 級延時,對應延時 1s。level=2 表示 2 級延時,對應5s,以此類推
                msg.setDelayTimeLevel(2);

                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
       producer.shutdown();
    }

}

 

 

參考部落格:

https://my.oschina.net/xinxingegeya/blog/1577410

https://help.aliyun.com/document_detail/29548.html?spm=a2c4g.11186623.6.575.42512de7YsiiZ5

https://www.jianshu.com/p/11e875074a8f