rabbitMq延遲佇列實現訂單失敗(訂單過期)
阿新 • • 發佈:2021-11-06
1.訂單失效原理
訂單失效的實現方式
1:redis的過期特性,redis提供了key過期的監聽事件介面,通過監聽key過期來實現訂單失效,不支援叢集環境(主從結構存在資料副本)
2:使用rabbitMq實現延遲佇列的功能。
- 當生成訂單時,將訂單號放入死信佇列(因為沒有訊息處理者,所以稱為死信佇列)設定訊息的存活時間為30分鐘,
- 當30分鐘過後,死信佇列的訊息會通過,路由轉發交換機,路由轉發交換機將訊息發給工作佇列
- 訊息處理者處理訊息
2.具體實現
- 匯入相應的jar包
<!--spring整合rabbitMq --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.1.2</version> </dependency> <!-- spring整合rabbitMq--> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.5.RELEASE</version> </dependency>
- 配置rabbit配置檔案spring-rabbit.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!-- 連線工廠--> <!-- 建立連線工廠將來生產連線,工廠名為rabbitConnectionFactory,登陸名為admin,密碼為admin,連線地址為192.168.192.3 ,埠號為5672--> <rabbit:connection-factory id="rabbitConnectionFactory" username="admin" password="admin" host="192.168.192.3" port="5672"/> <!--以管理員身份使用上面這個連線工廠 --> <rabbit:admin connection-factory="rabbitConnectionFactory"/> <!-- 死信佇列,佇列名為dead_queue。自動宣告--> <rabbit:queue name="dead_queue" auto-declare="true"> <!-- 定義佇列引數: --> <rabbit:queue-arguments> <!-- 設定訊息的存活時間 毫秒數--> <entry key="x-message-ttl" value="30000" value-type="java.lang.Long"/> <!-- 引用死信路由器 --> <entry key="x-dead-letter-exchange" value="dead_exchange"/> <!-- 設定死信路由鍵 --> <entry key="x-dead-letter-routing-key" value="task_queue"/> </rabbit:queue-arguments> </rabbit:queue> <!-- dead_exchange死信交換機--> <rabbit:direct-exchange name="dead_exchange" durable="false" auto-delete="false" id="dead_exchange"> <!-- 死信理由繫結轉發的佇列--> <rabbit:bindings> <rabbit:binding queue="task_queue"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- 任務佇列--> <!-- 消費者所獲取的訊息佇列--> <rabbit:queue name="task_queue" auto-declare="true" /> <!-- 執行緒池--> <!-- 消費者消費訊息的任務類 執行緒池去排程執行緒去執行任務類的方法 --> <!-- 配置執行緒池 --> <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 執行緒池維護執行緒的最少數量 --> <property name="corePoolSize" value="5"/> <!-- 執行緒池維護執行緒所允許的空閒時間 --> <property name="keepAliveSeconds" value="30000"/> <!-- 執行緒池維護執行緒的最大數量 --> <property name="maxPoolSize" value="2000"/> <!-- 執行緒池所使用的緩衝佇列 --> <property name="queueCapacity" value="20"/> </bean> <!-- 監聽程式監聽任務佇列消費--> <!-- 配置監聽容器--> <rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="auto" task-executor="taskExecutor"> <!-- 指定監聽佇列 使用哪個監聽類, 定義了 收到訊息之後的邏輯 ,來監聽--> <rabbit:listener queues="task_queue" ref="delayTask"/> </rabbit:listener-container> <!-- 訊息消費的任務類--> <bean id="delayTask" class="com.oracle.shop.task.OrdersExpireMessageTask"/> <!-- 配置生產訊息的客戶端物件,將來發送訊息到dead_queue --> <!-- 例項化一個生產者客戶端物件,將來向dead_queue寫訊息 spring封裝了amqpTempalte --> <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" queue="dead_queue" routing-key="dead_queue"/> </beans>
- 編寫處理類
//訂單過期處理 public class OrdersExpireMessageTask implements MessageListener { @Autowired private OrdersService ordersService; //消費者監聽佇列進行處理 @Override public void onMessage(Message message) { //訊息過期後,接收並進行處理 String orderNumber = new String(message.getBody()); //查詢訂單狀態,如果未支付,則查詢支付寶介面,確認訂單狀態,然後進行操作 Orders orders = ordersService.selectOrdersByOrdersNum(orderNumber); if (orders.getOrdersStatus() == 0) {//未支付 //通過支付包驗證訂單是否支付 try { AlipayClient alipayClient = new DefaultAlipayClient(AlipayConfig.URL, AlipayConfig.APPID, AlipayConfig.RSA_PRIVATE_KEY, AlipayConfig.FORMAT, AlipayConfig.CHARSET, AlipayConfig.ALIPAY_PUBLIC_KEY, AlipayConfig.SIGNTYPE); //建立查詢請求 AlipayTradeQueryRequest alipayTradeQueryRequest = new AlipayTradeQueryRequest(); //設定請求引數 alipayTradeQueryRequest.setBizContent("{" + "\"out_trade_no\":\"" + orderNumber + "\"" + "}"); //執行查詢,返回查詢結果 AlipayTradeQueryResponse alipayTradeQueryResponse = alipayClient.execute(alipayTradeQueryRequest); //對查詢結果進行處理 if (alipayTradeQueryResponse.isSuccess()) { switch (alipayTradeQueryResponse.getTradeStatus()) // 判斷交易結果 { case "TRADE_FINISHED": // 交易結束並不可退款,修改訂單狀態為已支付 ordersService.updateOrdersStatusPaied(orderNumber); break; case "TRADE_SUCCESS": // 交易支付成功,修改訂單狀態為,已支付 ordersService.updateOrdersStatusPaied(orderNumber); break; case "TRADE_CLOSED": // 未付款交易超時關閉或支付完成後全額退款,修改訂單狀態為,未支付 ordersService.updateOrdersStatusOverdue(orderNumber); break; case "WAIT_BUYER_PAY": // 交易建立並等待買家付款,修改訂單狀態為,未支付 ordersService.updateOrdersStatusOverdue(orderNumber); break; default: break; } } else {//支付寶方面沒有該訂單,直接修改訂單資訊為過期 ordersService.updateOrdersStatusOverdue(orderNumber); } } catch (AlipayApiException e) { e.printStackTrace(); } } } }