1. 程式人生 > >RocketMQ-順序訊息

RocketMQ-順序訊息

github程式碼下載地址

1.RocketMQ有三種訊息

    1)普通訊息

    2)順序訊息

    3)事務訊息

2.順序訊息

    概念:是MQ提供的一種嚴格按照順序進行釋出和消費的訊息型別;因此可以看出,順序訊息由兩部分組成,順序釋出和

順序消費。

3.在MQ中如何保證順序消費

        (1) 訊息傳送是保證是順序的    (2) 訊息被儲存時保證是和傳送的順序一致 (3)訊息被消費時保證和儲存的順序一致

傳送訊息保證是順序的意味著對於有順序的訊息,使用者要保證使用同一個執行緒採用同步的方式傳送;而儲存和傳送的

順序一致則要求在同一個執行緒傳送過來的訊息A和訊息B,儲存時在空間上訊息A一定在訊息B之前;最後消費和儲存的順

序保持一致則要求,在訊息A和訊息B到達Consumer之後必須按照先消費訊息A在消費訊息B的順序執行

對於兩個訂單的訊息的原始資料:a1、b1、b2、a2、a3、b3(絕對時間下發生的順序):

   1) 在傳送時,a訂單的訊息必須保證a1 ,a2 , a3的順序傳送,b訂單的訊息也一樣,但是a,b訂單之間的訊息沒有順序關係;

這意味這a,b訂單的訊息可以通過不同的執行緒傳送出去

   2) 在儲存時,需要分別保證a,b訂單各自訊息的順序,但是a,b訂單之間的訊息可以不保證

   3)在消費時,只要保證每一個分割槽只有一個執行緒來去處理即可當然,如果a、b在一個分割槽中,在收到訊息後也可以將他們拆分到不同執行緒中處理,不過要權衡一下收益

4.生產者程式碼

      1)生產者

package com.roger.order.producer;

import com.roger.order.entity.OrderMsgDTO;
import com.roger.utils.SnowflakeIdWorker;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.ArrayList;
import java.util.List;

public class OrderMqProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer defaultMQProducer =
                new DefaultMQProducer("orderMQProducerGroup");
        defaultMQProducer.setNamesrvAddr("172.20.10.60:9876");
        defaultMQProducer.start();
        defaultMQProducer.createTopic("OrderTopic", "OrderTopic", 3);

        String[] tags = new String[]{"TagC", "TagP", "TagF"};

        List<OrderMsgDTO> orderMsgList = new ArrayList<>();
        int orderCount = 5;
        for (int i = 0; i < orderCount; i++) {
            long orderId = SnowflakeIdWorker.getInstance().nextId();
            OrderMqProducer.builderOrderMsgList(orderMsgList, orderId);
        }

        for (int i = 0; i < orderMsgList.size(); i++) {
            OrderMsgDTO orderMsgDTO = orderMsgList.get(i);
            String body = orderMsgDTO.toString();
            Message msg = new Message("OrderTopic",
                    tags[i % tags.length],
                    "OrderKey" + i,
                    body.getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() {

                //List<MessageQueue> msgQueList 訊息要傳送的Topic下的所有分割槽
                //Message message 訊息物件
                // Object args 額外的引數,使用者可以自己傳遞引數
                // 比如為了把同一個訂單的訊息傳送到同一個分割槽中,
                // 可以把訂單號作為一個引數傳遞過去然後mod分割槽個數,
                // 就可以保證把同一個訂單的訊息傳送到同一個分割槽中去
                @Override
                public MessageQueue select(List<MessageQueue> msgQueList, Message message, Object args) {
                    long orderId = (long) args;
                    long index = orderId % msgQueList.size();
                    return msgQueList.get((int) index);
                }
            }, orderMsgDTO.getOrderId());

            System.out.println(sendResult +
                    String.format("message [%s] send success.",
                            new String(msg.getBody())));
        }
        //defaultMQProducer.shutdown();
    }


    private static void builderOrderMsgList(List<OrderMsgDTO> orderMsgList, long orderId) {
        orderMsgList.add(new OrderMsgDTO(orderId, "Create"));
        orderMsgList.add(new OrderMsgDTO(orderId, "PayOff"));
        orderMsgList.add(new OrderMsgDTO(orderId, "Finish"));
    }
}

2.topic配置資訊

3 執行結果- 5個訂單的訊息分到了三個佇列(queue=0,1,2)中去了

SendResult [sendStatus=SEND_OK, 
            msgId=AC140A051DC818B4AAC2938813480000, 
            offsetMsgId=AC140A3C00002A9F00000000000297A4, 
            messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], 
            queueOffset=3
        ]
message [OrderMsgDTO(orderId=517724620921503744, msgType=Create)] send success.
SendResult [sendStatus=SEND_OK, 
            msgId=AC140A051DC818B4AAC2938813510001, 
            offsetMsgId=AC140A3C00002A9F000000000002988D, 
            messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], 
            queueOffset=4
        ]
message [OrderMsgDTO(orderId=517724620921503744, msgType=PayOff)] send success.
SendResult [sendStatus=SEND_OK, 
            msgId=AC140A051DC818B4AAC2938813C90002, 
            offsetMsgId=AC140A3C00002A9F0000000000029976, 
            messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], 
            queueOffset=5
        ]
message [OrderMsgDTO(orderId=517724620921503744, msgType=Finish)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938813EA0003, offsetMsgId=AC140A3C00002A9F0000000000029A5F, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=3]message [OrderMsgDTO(orderId=517724620925698048, msgType=Create)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938813F00004, offsetMsgId=AC140A3C00002A9F0000000000029B48, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=4]message [OrderMsgDTO(orderId=517724620925698048, msgType=PayOff)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938814000005, offsetMsgId=AC140A3C00002A9F0000000000029C31, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=5]message [OrderMsgDTO(orderId=517724620925698048, msgType=Finish)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938814A20006, offsetMsgId=AC140A3C00002A9F0000000000029D1A, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=2], queueOffset=12]message [OrderMsgDTO(orderId=517724620925698049, msgType=Create)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938815060007, offsetMsgId=AC140A3C00002A9F0000000000029E03, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=2], queueOffset=13]message [OrderMsgDTO(orderId=517724620925698049, msgType=PayOff)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938815110008, offsetMsgId=AC140A3C00002A9F0000000000029EEC, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=2], queueOffset=14]message [OrderMsgDTO(orderId=517724620925698049, msgType=Finish)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938815150009, offsetMsgId=AC140A3C00002A9F0000000000029FD5, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], queueOffset=6]message [OrderMsgDTO(orderId=517724620925698050, msgType=Create)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC293881546000A, offsetMsgId=AC140A3C00002A9F000000000002A0BE, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], queueOffset=7]message [OrderMsgDTO(orderId=517724620925698050, msgType=PayOff)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC293881549000B, offsetMsgId=AC140A3C00002A9F000000000002A1A8, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], queueOffset=8]message [OrderMsgDTO(orderId=517724620925698050, msgType=Finish)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938819F0000C, offsetMsgId=AC140A3C00002A9F000000000002A292, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=6]message [OrderMsgDTO(orderId=517724620925698051, msgType=Create)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC293881A30000D, offsetMsgId=AC140A3C00002A9F000000000002A37C, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=7]message [OrderMsgDTO(orderId=517724620925698051, msgType=PayOff)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938821A0000E, offsetMsgId=AC140A3C00002A9F000000000002A466, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=8]message [OrderMsgDTO(orderId=517724620925698051, msgType=Finish)] send success.

5.消費者程式碼-兩個消費者程式碼相同只是細微的差別

    1.程式碼

package com.roger.order.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class OrderMqConsumer1 {

    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer defaultMQPushConsumer =
                new DefaultMQPushConsumer("orderMQPushConsumerGroup");
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        defaultMQPushConsumer.setNamesrvAddr("172.20.10.60:9876");

        defaultMQPushConsumer.subscribe("OrderTopic","*");

        defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
            Random r = new Random();
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgExtList, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.println("當前執行緒名:" + Thread.currentThread().getName() + ";Receive new message:");
                for(MessageExt msgExt : msgExtList){
                    System.out.println(String.format("Consume message [%s],TagName [%s]",
                            new String(msgExt.getBody()),
                            msgExt.getTags()));
                    try {
                        //簡單業務處理邏輯
                        TimeUnit.SECONDS.sleep(r.nextInt(10));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        defaultMQPushConsumer.start();

        System.out.println("OrderMqConsumer1 Started...");
    }
}

   2.OrderMqConsumer1執行結果-

             (a) 消費了queue=0 和queue=1的資料

             (b) 每個訂單按照順序執行,並且是訂單號相同的訊息同一個執行緒執行的

OrderMqConsumer1 Started...
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698048, msgType=Create)],TagName [TagC]
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698048, msgType=PayOff)],TagName [TagP]
當前執行緒名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620921503744, msgType=Create)],TagName [TagC]
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698048, msgType=Finish)],TagName [TagF]
當前執行緒名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620921503744, msgType=PayOff)],TagName [TagP]
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698051, msgType=Create)],TagName [TagC]
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698051, msgType=PayOff)],TagName [TagP]
當前執行緒名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620921503744, msgType=Finish)],TagName [TagF]
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698051, msgType=Finish)],TagName [TagF]
當前執行緒名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698050, msgType=Create)],TagName [TagC]
當前執行緒名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698050, msgType=PayOff)],TagName [TagP]
當前執行緒名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698050, msgType=Finish)],TagName [TagF]



       3.OrderMqConsumer1執行結果-

OrderMqConsumer2 Started...
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698049, msgType=Create)],TagName [TagC]
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698049, msgType=PayOff)],TagName [TagP]
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698049, msgType=Finish)],TagName [TagF]