spring boot:用rocketmq傳送延時訊息用來取消訂單(spring boot 2.3.3)
一,為什麼要用延時訊息來取消訂單?
1,為什麼要取消訂單
在電商的下單過程中,需要在生成訂單時扣減庫存,
但有可能發生這種情況:使用者下了單,臨時改變主意不再支付,
則訂單不能無限期的保留,因為還要把佔用的庫存數量釋放出來,
所以通常會在使用者下單後半小時(或其他時長)把未支付的訂單取消不再保留。
2,取消訂單的方法:
通常我們會在crond中建立一個定時執行的任務,每1分鐘執行一次,
把下單時間超過半小時的取出來,檢查訂單狀態是否還是未支付,
如果仍未支付,則修改訂單狀態為無效,同時把庫存數量加回
這個做法的缺點是資料庫繁忙時會增加資料庫的壓力
3,rocketmq的延時訊息功能可以精準的在指定時間把訊息傳送到消費者,
而無需掃描資料庫,
在這裡我們使用延時訊息來實現取消訂單功能
說明:劉巨集締的架構森林是一個專注架構的部落格,地址:https://www.cnblogs.com/architectforest
對應的原始碼可以訪問這裡獲取:https://github.com/liuhongdi/
說明:作者:劉巨集締 郵箱: [email protected]
二,演示專案的相關資訊
1,專案地址:
https://github.com/liuhongdi/mqdelay
2,專案功能說明
演示了用rocketmq傳送延時訊息
3,專案結構:如圖:
三,配置檔案說明
1,send/pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <!--fastjson begin--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.73</version> </dependency>
2,receive/pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <!--fastjson begin--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.73</version> </dependency>
說明:兩個模組的pom.xml內容相同
3,receive/application.properties
server.port=8081
說明:兩個模組同時執行時,需要把埠區分開,
send不做修改,使用預設的8080埠
receive這裡指定使用8081埠
四,java程式碼說明
1,send/OrderMsg.java
//傳送的取消訂單資訊 public class OrderMsg { //使用者id private int userId; public int getUserId() { return this.userId; } public void setUserId(int userId) { this.userId = userId; } //訂單sn private String orderSn; public String getOrderSn() { return this.orderSn; } public void setOrderSn(String orderSn) { this.orderSn = orderSn; } }
說明:要取消的訂單的訊息模型,
OrderMsg.java在兩個模組中一致
2,send/RocketConstants.java
public class RocketConstants {//name server,有多個時用分號隔開 public static final String NAME_SERVER = "127.0.0.1:9876"; //topic的名字,應該從服務端先建立好,否則會報錯 public static final String TOPIC = "laoliutest"; }
rocketmq需要用到的name server和topic名字
RocketConstants.java在兩個模組中一致
3,send/Producer.java
//訊息生產者類 @Component public class Producer { private String producerGroup = "order_producer"; private DefaultMQProducer producer; //構造 public Producer(){ //建立生產者 producer = new DefaultMQProducer(producerGroup); //不開啟vip通道 producer.setVipChannelEnabled(false); //設定 name server producer.setNamesrvAddr(RocketConstants.NAME_SERVER); //producer.m start(); } //使producer啟動 public void start(){ try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } //返回producer public DefaultMQProducer getProducer(){ return this.producer; } //進行關閉的方法 public void shutdown(){ this.producer.shutdown(); } }
生產者類
4,send/HomeController.java
@RestController @RequestMapping("/home") public class HomeController { @Autowired private Producer producer; //初始化併發送訊息 @RequestMapping("/send") public String send() throws Exception { int userId = 1; //得到訂單編號: DateTimeFormatter df_year = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"); LocalDateTime date = LocalDateTime.now(); String datestr = date.format(df_year); //訊息,指定使用者id和訂單編號 OrderMsg msg = new OrderMsg(); msg.setUserId(userId); msg.setOrderSn(userId+"_"+datestr); String msgJson = JSON.toJSONString(msg); //生成一個資訊,標籤在這裡手動指定 Message message = new Message(RocketConstants.TOPIC, "carttag", msgJson.getBytes()); //delaytime的值: //messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h message.setDelayTimeLevel(5); //傳送資訊 SendResult sendResult = producer.getProducer().send(message); System.out.println("時間:"+ TimeUtil.getTimeNow()+";生產者傳送一條資訊,內容:{"+msgJson+"},結果:{"+sendResult+"}"); return "success"; } }
傳送訊息
注意延遲時間的值5對應1m,所以消費者應該會在1分鐘後才收到訊息
5,receive/Consumer.java
@Component public class Consumer { //消費者實體物件 private DefaultMQPushConsumer consumer; //消費者組 public static final String CONSUMER_GROUP = "order_consumer"; //建構函式 用來例項化物件 public Consumer() throws MQClientException { consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(RocketConstants.NAME_SERVER); //指定消費模式 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //指定訂閱主題 //指定訂閱標籤,*代表所有標籤 consumer.subscribe(RocketConstants.TOPIC, "*"); //註冊一個消費訊息的Listener //對訊息的消費在這裡實現 //兩種回撥 MessageListenerConcurrently 為普通監聽,MessageListenerOrderly 為順序監聽 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { //遍歷接收到的訊息 try { for (Message msg : msgs) { //得到訊息的body String body = new String(msg.getBody(), "utf-8"); //用json轉成物件 OrderMsg msgOne = JSON.parseObject(body, OrderMsg.class); //列印使用者id System.out.println("訊息:使用者id:"+msgOne.getUserId()); //列印訂單編號 System.out.println("訊息:訂單sn:"+msgOne.getOrderSn()); //列印訊息內容 System.out.println("時間:"+ TimeUtil.getTimeNow()+";消費者已接收到訊息-topic={"+msg.getTopic()+"}, 訊息內容={"+body+"}"); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("消費者 啟動成功======="); } }
6,其他非關鍵程式碼可檢視github
五,測試效果
1,分別啟動兩個模組
2,訪問:
http://127.0.0.1:8080/home/send
返回:
success
檢視send的控制檯:
時間:2020-09-17 14:56:53.207;生產者傳送一條資訊, 內容:{{"orderSn":"1_20200917145653166","userId":1}}, 結果:{SendResult [sendStatus=SEND_OK, msgId=C0A803D5231F42A57993559ADFC50000, offsetMsgId=7F00000100002A9F0000000000016E7B, messageQueue=MessageQueue [topic=laoliutest, brokerName=broker-a, queueId=0], queueOffset=13]}
注意傳送的時間:2020-09-17 14:56:53.207
檢視receive的控制檯:
訊息:使用者id:1 訊息:訂單sn:1_20200917145653166 時間:2020-09-17 14:57:53.212; 消費者已接收到訊息-topic={laoliutest}, 訊息內容={{"orderSn":"1_20200917145653166","userId":1}}
注意接收到的時間:2020-09-17 14:57:53.212
時長整好是60秒,和我們在程式碼中的設定一致
六,檢視spring boot版本:
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.3.3.RELEASE)