1. 程式人生 > >必知必會的RocketMQ訊息型別

必知必會的RocketMQ訊息型別

普通訊息

普通訊息也叫做無序訊息,簡單來說就是沒有順序的訊息,producer 只管傳送訊息,consumer 只管接收訊息,至於訊息和訊息之間的順序並沒有保證,可能先發送的訊息先消費,也可能先發送的訊息後消費。

舉個簡單例子,producer 依次傳送 order id 為 1、2、3 的訊息到 broker,consumer 接到的訊息順序有可能是 1、2、3,也有可能是 2、1、3 等情況,這就是普通訊息。

因為不需要保證訊息的順序,所以訊息可以大規模併發地傳送和消費,吞吐量很高,適合大部分場景。

程式碼示例

  • 生產者
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        //宣告並初始化一個producer
        //需要一個producer group名字作為構造方法的引數,這裡為concurrent_producer
        DefaultMQProducer producer = new DefaultMQProducer("concurrent_producer");

        //設定NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔
        //NameServer的地址必須有,但是也可以通過環境變數的方式設定,不一定非得寫死在程式碼裡
        producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");

        //呼叫start()方法啟動一個producer例項
        producer.start();

        //傳送10條訊息到Topic為TopicTest,tag為TagA,訊息內容為“Hello RocketMQ”拼接上i的值
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTestConcurrent",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
                );

                //呼叫producer的send()方法傳送訊息
                //這裡呼叫的是同步的方式,所以會有返回結果,同時預設傳送的也是普通訊息
                SendResult sendResult = producer.send(msg);

                //列印返回結果,可以看到訊息傳送的狀態以及一些相關資訊
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        //傳送完訊息之後,呼叫shutdown()方法關閉producer
        producer.shutdown();
    }
}
  • 消費者
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        //宣告並初始化一個consumer
        //需要一個consumer group名字作為構造方法的引數,這裡為concurrent_consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("concurrent_consumer");

        //同樣也要設定NameServer地址
        consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");

        //這裡設定的是一個consumer的消費策略
        //CONSUME_FROM_LAST_OFFSET 預設策略,從該佇列最尾開始消費,即跳過歷史訊息
        //CONSUME_FROM_FIRST_OFFSET 從佇列最開始開始消費,即歷史訊息(還儲存在broker的)全部消費一遍
        //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,預設是半個小時以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //設定consumer所訂閱的Topic和Tag,*代表全部的Tag
        consumer.subscribe("TopicTestConcurrent", "*");

        //設定一個Listener,主要進行訊息的邏輯處理
        //注意這裡使用的是MessageListenerConcurrently這個介面
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {

                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

                //返回消費狀態
                //CONSUME_SUCCESS 消費成功
                //RECONSUME_LATER 消費失敗,需要稍後重新消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //呼叫start()方法啟動consumer
        consumer.start();

        System.out.println("Consumer Started.");
    }
}

有序訊息

有序訊息就是按照一定的先後順序的訊息型別。

舉個例子來說,producer 依次傳送 order id 為 1、2、3 的訊息到 broker,consumer 接到的訊息順序也就是 1、2、3 ,而不會出現普通訊息那樣的 2、1、3 等情況。

那麼有序訊息是如何保證的呢?我們都知道訊息首先由 producer 到 broker,再從 broker 到 consumer,分這兩步走。那麼要保證訊息的有序,勢必這兩步都是要保證有序的,即要保證訊息是按有序傳送到 broker,broker 也是有序將訊息投遞給 consumer,兩個條件必須同時滿足,缺一不可。
進一步還可以將有序訊息分成

  • 全域性有序訊息
  • 區域性有序訊息

之前我們講過,topic 只是訊息的邏輯分類,內部實現其實是由 queue 組成。當 producer 把訊息傳送到某個 topic 時,預設是會訊息傳送到具體的 queue 上。

全域性有序

舉個例子,producer 傳送 order id 為 1、2、3、4 的四條訊息到 topicA 上,假設 topicA 的 queue 數為 3 個(queue0、queue1、queue2),那麼訊息的分佈可能就是這種情況,id 為 1 的在 queue0,id 為 2 的在 queue1,id 為 3 的在 queue2,id 為 4 的在 queue0。同樣的,consumer 消費時也是按 queue 去消費,這時候就可能出現先消費 1、4,再消費 2、3,和我們的預期不符。那麼我們如何實現 1、2、3、4 的消費順序呢?道理其實很簡單,只需要把訂單 topic 的 queue 數改為 1,如此一來,只要 producer 按照 1、2、3、4 的順序去傳送訊息,那麼 consumer 自然也就按照 1、2、3、4 的順序去消費,這就是全域性有序訊息。

由於一個 topic 只有一個 queue ,即使我們有多個 producer 例項和 consumer 例項也很難提高訊息吞吐量。就好比過獨木橋,大家只能一個挨著一個過去,效率低下。

那麼有沒有吞吐量和有序之間折中的方案呢?其實是有的,就是區域性有序訊息。

區域性有序

我們知道訂單訊息可以再細分為訂單建立、訂單付款、訂單完成等訊息,這些訊息都有相同的 order id。同時,也只有按照訂單建立、訂單付款、訂單完成的順序去消費才符合業務邏輯。但是不同 order id 的訊息是可以並行的,不會影響到業務。這時候就常見做法就是將 order id 進行處理,將 order id 相同的訊息傳送到 topicB 的同一個 queue,假設我們 topicB 有 2 個 queue,那麼我們可以簡單的對 id 取餘,奇數的發往 queue0,偶數的發往 queue1,消費者按照 queue 去消費時,就能保證 queue0 裡面的訊息有序消費,queue1 裡面的訊息有序消費。

由於一個 topic 可以有多個 queue,所以在效能比全域性有序高得多。假設 queue 數是 n,理論上效能就是全域性有序的 n 倍,當然 consumer 也要跟著增加才行。在實際情況中,這種區域性有序訊息是會比全域性有序訊息用的更多。

示例程式碼

  • 生產者
public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            // 宣告並初始化一個producer
            // 需要一個producer group名字作為構造方法的引數,這裡為ordered_producer
            DefaultMQProducer orderedProducer = new DefaultMQProducer("ordered_producer");

            // 設定NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔
            //NameServer的地址必須有,但是也可以通過環境變數的方式設定,不一定非得寫死在程式碼裡
            orderedProducer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");

            // 呼叫start()方法啟動一個producer例項
            orderedProducer.start();

            // 自定義一個tag陣列
            String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};

            // 傳送10條訊息到Topic為TopicTestOrdered,tag為tags陣列按順序取值,
            // key值為“KEY”拼接上i的值,訊息內容為“Hello RocketMQ”拼接上i的值
            for (int i = 0; i < 10; i++) {

                int orderId = i % 10;
                Message msg =
                        new Message("TopicTestOrdered", tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

                SendResult sendResult = orderedProducer.send(msg, new MessageQueueSelector() {

                    // 選擇傳送訊息的佇列
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

                        // arg的值其實就是orderId
                        Integer id = (Integer) arg;

                        // mqs是佇列集合,也就是topic所對應的所有佇列
                        int index = id % mqs.size();

                        // 這裡根據前面的id對佇列集合大小求餘來返回所對應的佇列
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.println(sendResult);
            }

            orderedProducer.shutdown();
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

至於是要實現全域性有序,還是區域性有序,在此示例程式碼中,就取決於 TopicTestOrdered 這個 Topic 的佇列數了。

  • 消費者
public class Consumer {

    public static void main(String[] args) throws MQClientException {

        //宣告並初始化一個consumer
        //需要一個consumer group名字作為構造方法的引數,這裡為concurrent_consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer");

        //同樣也要設定NameServer地址
        consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");

        //這裡設定的是一個consumer的消費策略
        //CONSUME_FROM_LAST_OFFSET 預設策略,從該佇列最尾開始消費,即跳過歷史訊息
        //CONSUME_FROM_FIRST_OFFSET 從佇列最開始開始消費,即歷史訊息(還儲存在broker的)全部消費一遍
        //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,預設是半個小時以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //設定consumer所訂閱的Topic和Tag
        consumer.subscribe("TopicTestOrdered", "TagA || TagC || TagD");

        //設定一個Listener,主要進行訊息的邏輯處理
        //注意這裡使用的是MessageListenerOrderly這個介面
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

                //返回消費狀態
                //SUCCESS 消費成功
                //SUSPEND_CURRENT_QUEUE_A_MOMENT 消費失敗,暫停當前佇列的消費
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        //呼叫start()方法啟動consumer
        consumer.start();

        System.out.println("Consumer Started.");
    }
}

延時訊息

延時訊息,簡單來說就是當 producer 將訊息傳送到 broker 後,會延時一定時間後才投遞給 consumer 進行消費。

RcoketMQ的延時等級為:1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。level=0,表示不延時。level=1,表示 1 級延時,對應延時 1s。level=2 表示 2 級延時,對應5s,以此類推。

這種訊息一般適用於訊息生產和消費之間有時間視窗要求的場景。比如說我們網購時,下單之後是有一個支付時間,超過這個時間未支付,系統就應該自動關閉該筆訂單。那麼在訂單建立的時候就會就需要傳送一條延時訊息(延時15分鐘)後投遞給 consumer,consumer 接收訊息後再對訂單的支付狀態進行判斷是否關閉訂單。

設定延時非常簡單,只需要在Message設定對應的延時級別即可:

Message msg = new Message("TopicTest",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
                );
                // 這裡設定需要延時的等級即可
                msg.setDelayTimeLevel(3);
                SendResult sendResult = producer.send(msg);

文章轉載自:https://www.jianshu.com/p/824066d70da8