RocketMQ-順序訊息
github程式碼下載地址
1.RocketMQ有三種訊息
1)普通訊息
2)順序訊息
3)事務訊息
2.順序訊息
概念:是MQ提供的一種嚴格按照順序進行釋出和消費的訊息型別;因此可以看出,順序訊息由兩部分組成,順序釋出和
順序消費。
3.在MQ中如何保證順序消費
(1) 訊息傳送是保證是順序的 (2) 訊息被儲存時保證是和傳送的順序一致 (3)訊息被消費時保證和儲存的順序一致
傳送訊息保證是順序的意味著對於有順序的訊息,使用者要保證使用同一個執行緒採用同步的方式傳送;而儲存和傳送的
順序一致則要求在同一個執行緒傳送過來的訊息A和訊息B,儲存時在空間上訊息A一定在訊息B之前;最後消費和儲存的順
序保持一致則要求,在訊息A和訊息B到達Consumer之後必須按照先消費訊息A在消費訊息B的順序執行
對於兩個訂單的訊息的原始資料:a1、b1、b2、a2、a3、b3(絕對時間下發生的順序):
1) 在傳送時,a訂單的訊息必須保證a1 ,a2 , a3的順序傳送,b訂單的訊息也一樣,但是a,b訂單之間的訊息沒有順序關係;
這意味這a,b訂單的訊息可以通過不同的執行緒傳送出去
2) 在儲存時,需要分別保證a,b訂單各自訊息的順序,但是a,b訂單之間的訊息可以不保證
3)在消費時,只要保證每一個分割槽只有一個執行緒來去處理即可當然,如果a、b在一個分割槽中,在收到訊息後也可以將他們拆分到不同執行緒中處理,不過要權衡一下收益
4.生產者程式碼
1)生產者
package com.roger.order.producer; import com.roger.order.entity.OrderMsgDTO; import com.roger.utils.SnowflakeIdWorker; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.ArrayList; import java.util.List; public class OrderMqProducer { public static void main(String[] args) throws Exception { DefaultMQProducer defaultMQProducer = new DefaultMQProducer("orderMQProducerGroup"); defaultMQProducer.setNamesrvAddr("172.20.10.60:9876"); defaultMQProducer.start(); defaultMQProducer.createTopic("OrderTopic", "OrderTopic", 3); String[] tags = new String[]{"TagC", "TagP", "TagF"}; List<OrderMsgDTO> orderMsgList = new ArrayList<>(); int orderCount = 5; for (int i = 0; i < orderCount; i++) { long orderId = SnowflakeIdWorker.getInstance().nextId(); OrderMqProducer.builderOrderMsgList(orderMsgList, orderId); } for (int i = 0; i < orderMsgList.size(); i++) { OrderMsgDTO orderMsgDTO = orderMsgList.get(i); String body = orderMsgDTO.toString(); Message msg = new Message("OrderTopic", tags[i % tags.length], "OrderKey" + i, body.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() { //List<MessageQueue> msgQueList 訊息要傳送的Topic下的所有分割槽 //Message message 訊息物件 // Object args 額外的引數,使用者可以自己傳遞引數 // 比如為了把同一個訂單的訊息傳送到同一個分割槽中, // 可以把訂單號作為一個引數傳遞過去然後mod分割槽個數, // 就可以保證把同一個訂單的訊息傳送到同一個分割槽中去 @Override public MessageQueue select(List<MessageQueue> msgQueList, Message message, Object args) { long orderId = (long) args; long index = orderId % msgQueList.size(); return msgQueList.get((int) index); } }, orderMsgDTO.getOrderId()); System.out.println(sendResult + String.format("message [%s] send success.", new String(msg.getBody()))); } //defaultMQProducer.shutdown(); } private static void builderOrderMsgList(List<OrderMsgDTO> orderMsgList, long orderId) { orderMsgList.add(new OrderMsgDTO(orderId, "Create")); orderMsgList.add(new OrderMsgDTO(orderId, "PayOff")); orderMsgList.add(new OrderMsgDTO(orderId, "Finish")); } }
2.topic配置資訊
3 執行結果- 5個訂單的訊息分到了三個佇列(queue=0,1,2)中去了
SendResult [sendStatus=SEND_OK,
msgId=AC140A051DC818B4AAC2938813480000,
offsetMsgId=AC140A3C00002A9F00000000000297A4,
messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0],
queueOffset=3
]
message [OrderMsgDTO(orderId=517724620921503744, msgType=Create)] send success.
SendResult [sendStatus=SEND_OK,
msgId=AC140A051DC818B4AAC2938813510001,
offsetMsgId=AC140A3C00002A9F000000000002988D,
messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0],
queueOffset=4
]
message [OrderMsgDTO(orderId=517724620921503744, msgType=PayOff)] send success.
SendResult [sendStatus=SEND_OK,
msgId=AC140A051DC818B4AAC2938813C90002,
offsetMsgId=AC140A3C00002A9F0000000000029976,
messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0],
queueOffset=5
]
message [OrderMsgDTO(orderId=517724620921503744, msgType=Finish)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938813EA0003, offsetMsgId=AC140A3C00002A9F0000000000029A5F, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=3]message [OrderMsgDTO(orderId=517724620925698048, msgType=Create)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938813F00004, offsetMsgId=AC140A3C00002A9F0000000000029B48, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=4]message [OrderMsgDTO(orderId=517724620925698048, msgType=PayOff)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938814000005, offsetMsgId=AC140A3C00002A9F0000000000029C31, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=5]message [OrderMsgDTO(orderId=517724620925698048, msgType=Finish)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938814A20006, offsetMsgId=AC140A3C00002A9F0000000000029D1A, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=2], queueOffset=12]message [OrderMsgDTO(orderId=517724620925698049, msgType=Create)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938815060007, offsetMsgId=AC140A3C00002A9F0000000000029E03, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=2], queueOffset=13]message [OrderMsgDTO(orderId=517724620925698049, msgType=PayOff)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938815110008, offsetMsgId=AC140A3C00002A9F0000000000029EEC, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=2], queueOffset=14]message [OrderMsgDTO(orderId=517724620925698049, msgType=Finish)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938815150009, offsetMsgId=AC140A3C00002A9F0000000000029FD5, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], queueOffset=6]message [OrderMsgDTO(orderId=517724620925698050, msgType=Create)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC293881546000A, offsetMsgId=AC140A3C00002A9F000000000002A0BE, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], queueOffset=7]message [OrderMsgDTO(orderId=517724620925698050, msgType=PayOff)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC293881549000B, offsetMsgId=AC140A3C00002A9F000000000002A1A8, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=0], queueOffset=8]message [OrderMsgDTO(orderId=517724620925698050, msgType=Finish)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938819F0000C, offsetMsgId=AC140A3C00002A9F000000000002A292, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=6]message [OrderMsgDTO(orderId=517724620925698051, msgType=Create)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC293881A30000D, offsetMsgId=AC140A3C00002A9F000000000002A37C, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=7]message [OrderMsgDTO(orderId=517724620925698051, msgType=PayOff)] send success.
SendResult [sendStatus=SEND_OK, msgId=AC140A051DC818B4AAC2938821A0000E, offsetMsgId=AC140A3C00002A9F000000000002A466, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=8]message [OrderMsgDTO(orderId=517724620925698051, msgType=Finish)] send success.
5.消費者程式碼-兩個消費者程式碼相同只是細微的差別
1.程式碼
package com.roger.order.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class OrderMqConsumer1 {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer defaultMQPushConsumer =
new DefaultMQPushConsumer("orderMQPushConsumerGroup");
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
defaultMQPushConsumer.setNamesrvAddr("172.20.10.60:9876");
defaultMQPushConsumer.subscribe("OrderTopic","*");
defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
Random r = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgExtList, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.println("當前執行緒名:" + Thread.currentThread().getName() + ";Receive new message:");
for(MessageExt msgExt : msgExtList){
System.out.println(String.format("Consume message [%s],TagName [%s]",
new String(msgExt.getBody()),
msgExt.getTags()));
try {
//簡單業務處理邏輯
TimeUnit.SECONDS.sleep(r.nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
defaultMQPushConsumer.start();
System.out.println("OrderMqConsumer1 Started...");
}
}
2.OrderMqConsumer1執行結果-
(a) 消費了queue=0 和queue=1的資料
(b) 每個訂單按照順序執行,並且是訂單號相同的訊息同一個執行緒執行的
OrderMqConsumer1 Started...
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698048, msgType=Create)],TagName [TagC]
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698048, msgType=PayOff)],TagName [TagP]
當前執行緒名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620921503744, msgType=Create)],TagName [TagC]
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698048, msgType=Finish)],TagName [TagF]
當前執行緒名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620921503744, msgType=PayOff)],TagName [TagP]
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698051, msgType=Create)],TagName [TagC]
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698051, msgType=PayOff)],TagName [TagP]
當前執行緒名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620921503744, msgType=Finish)],TagName [TagF]
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698051, msgType=Finish)],TagName [TagF]
當前執行緒名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698050, msgType=Create)],TagName [TagC]
當前執行緒名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698050, msgType=PayOff)],TagName [TagP]
當前執行緒名:ConsumeMessageThread_2;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698050, msgType=Finish)],TagName [TagF]
3.OrderMqConsumer1執行結果-
OrderMqConsumer2 Started...
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698049, msgType=Create)],TagName [TagC]
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698049, msgType=PayOff)],TagName [TagP]
當前執行緒名:ConsumeMessageThread_1;Receive new message:
Consume message [OrderMsgDTO(orderId=517724620925698049, msgType=Finish)],TagName [TagF]