1. 程式人生 > 其它 >rabbitMq延遲佇列實現訂單失敗(訂單過期)

rabbitMq延遲佇列實現訂單失敗(訂單過期)

1.訂單失效原理

訂單失效的實現方式
1:redis的過期特性,redis提供了key過期的監聽事件介面,通過監聽key過期來實現訂單失效,不支援叢集環境(主從結構存在資料副本)
2:使用rabbitMq實現延遲佇列的功能。

  1. 當生成訂單時,將訂單號放入死信佇列(因為沒有訊息處理者,所以稱為死信佇列)設定訊息的存活時間為30分鐘,
  2. 當30分鐘過後,死信佇列的訊息會通過,路由轉發交換機,路由轉發交換機將訊息發給工作佇列
  3. 訊息處理者處理訊息

2.具體實現

  • 匯入相應的jar包
<!--spring整合rabbitMq -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.1.2</version>
</dependency>
<!-- spring整合rabbitMq-->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.3.5.RELEASE</version>
</dependency>
  • 配置rabbit配置檔案spring-rabbit.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!-- 連線工廠-->
    <!-- 建立連線工廠將來生產連線,工廠名為rabbitConnectionFactory,登陸名為admin,密碼為admin,連線地址為192.168.192.3 ,埠號為5672-->
    <rabbit:connection-factory id="rabbitConnectionFactory" username="admin" password="admin" host="192.168.192.3" port="5672"/>

    <!--以管理員身份使用上面這個連線工廠 -->
    <rabbit:admin connection-factory="rabbitConnectionFactory"/>
    <!-- 死信佇列,佇列名為dead_queue。自動宣告-->
    <rabbit:queue name="dead_queue" auto-declare="true">
        <!-- 定義佇列引數: -->
        <rabbit:queue-arguments>
            <!-- 設定訊息的存活時間  毫秒數-->
            <entry key="x-message-ttl" value="30000" value-type="java.lang.Long"/>
            <!-- 引用死信路由器 -->
            <entry key="x-dead-letter-exchange" value="dead_exchange"/>
            <!-- 設定死信路由鍵 -->
            <entry key="x-dead-letter-routing-key" value="task_queue"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <!-- dead_exchange死信交換機-->
    <rabbit:direct-exchange name="dead_exchange" durable="false" auto-delete="false" id="dead_exchange">
        <!-- 死信理由繫結轉發的佇列-->
        <rabbit:bindings>
            <rabbit:binding queue="task_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 任務佇列-->
    <!-- 消費者所獲取的訊息佇列-->
    <rabbit:queue name="task_queue" auto-declare="true" />
    <!-- 執行緒池-->
    <!-- 消費者消費訊息的任務類
     執行緒池去排程執行緒去執行任務類的方法
     -->
    <!-- 配置執行緒池 -->
    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 執行緒池維護執行緒的最少數量 -->
        <property name="corePoolSize" value="5"/>
        <!-- 執行緒池維護執行緒所允許的空閒時間 -->
        <property name="keepAliveSeconds" value="30000"/>
        <!-- 執行緒池維護執行緒的最大數量 -->
        <property name="maxPoolSize" value="2000"/>
        <!-- 執行緒池所使用的緩衝佇列 -->
        <property name="queueCapacity" value="20"/>
    </bean>

    <!-- 監聽程式監聽任務佇列消費-->
    <!-- 配置監聽容器-->
    <rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="auto" task-executor="taskExecutor">
        <!-- 指定監聽佇列 使用哪個監聽類, 定義了 收到訊息之後的邏輯 ,來監聽-->
        <rabbit:listener queues="task_queue" ref="delayTask"/>
    </rabbit:listener-container>

    <!-- 訊息消費的任務類-->
    <bean id="delayTask" class="com.oracle.shop.task.OrdersExpireMessageTask"/>

    <!-- 配置生產訊息的客戶端物件,將來發送訊息到dead_queue -->
    <!-- 例項化一個生產者客戶端物件,將來向dead_queue寫訊息
   spring封裝了amqpTempalte
   -->
    <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" queue="dead_queue" routing-key="dead_queue"/>
</beans>
  • 編寫處理類
//訂單過期處理
public class OrdersExpireMessageTask implements MessageListener {

    @Autowired
    private OrdersService ordersService;

    //消費者監聽佇列進行處理
    @Override
    public void onMessage(Message message) {
        //訊息過期後,接收並進行處理
        String orderNumber = new String(message.getBody());
        //查詢訂單狀態,如果未支付,則查詢支付寶介面,確認訂單狀態,然後進行操作
        Orders orders = ordersService.selectOrdersByOrdersNum(orderNumber);
        if (orders.getOrdersStatus() == 0) {//未支付
            //通過支付包驗證訂單是否支付
            try {
                AlipayClient alipayClient = new DefaultAlipayClient(AlipayConfig.URL, AlipayConfig.APPID,
                        AlipayConfig.RSA_PRIVATE_KEY, AlipayConfig.FORMAT, AlipayConfig.CHARSET,
                        AlipayConfig.ALIPAY_PUBLIC_KEY, AlipayConfig.SIGNTYPE);
                //建立查詢請求
                AlipayTradeQueryRequest alipayTradeQueryRequest = new AlipayTradeQueryRequest();
                //設定請求引數
                alipayTradeQueryRequest.setBizContent("{" + "\"out_trade_no\":\"" + orderNumber + "\"" + "}");
                //執行查詢,返回查詢結果
                AlipayTradeQueryResponse alipayTradeQueryResponse = alipayClient.execute(alipayTradeQueryRequest);
                //對查詢結果進行處理
                if (alipayTradeQueryResponse.isSuccess()) {
                    switch (alipayTradeQueryResponse.getTradeStatus()) // 判斷交易結果
                    {
                        case "TRADE_FINISHED": // 交易結束並不可退款,修改訂單狀態為已支付
                            ordersService.updateOrdersStatusPaied(orderNumber);
                            break;
                        case "TRADE_SUCCESS": // 交易支付成功,修改訂單狀態為,已支付
                            ordersService.updateOrdersStatusPaied(orderNumber);
                            break;
                        case "TRADE_CLOSED": // 未付款交易超時關閉或支付完成後全額退款,修改訂單狀態為,未支付
                            ordersService.updateOrdersStatusOverdue(orderNumber);
                            break;
                        case "WAIT_BUYER_PAY": // 交易建立並等待買家付款,修改訂單狀態為,未支付
                            ordersService.updateOrdersStatusOverdue(orderNumber);
                            break;
                        default:
                            break;
                    }
                } else {//支付寶方面沒有該訂單,直接修改訂單資訊為過期
                    ordersService.updateOrdersStatusOverdue(orderNumber);
                }
            } catch (AlipayApiException e) {
                e.printStackTrace();
            }
        }
    }
}