1. 程式人生 > 其它 >RabbitMQ延時佇列應用場景

RabbitMQ延時佇列應用場景

應用場景

我們系統未付款的訂單,超過一定時間後,需要系統自動取消訂單並釋放佔有物品

常用的方案

就是利用Spring schedule定時任務,輪詢檢查資料庫

但是會消耗系統記憶體,增加了資料庫的壓力、還存在較大的時間誤差

解決:rabbitmq的訊息TTL和死信Exchange結合

介紹

1.何為訊息TTL、死信

死信:對訊息設定的過期時間到了,這個訊息還沒有被消費就認為這個訊息死了,死了的訊息會進入死信交換機(Dead Letter Exchanges)

成為死信的三種條件:

  • 一個訊息被Consumer拒收了,並且reject方法的引數裡requeue是false。也就是說不會被再次放在佇列裡,被其他消費者使用。(basic.reject/ basic.nack)requeue=false
  • 上面的訊息的TTL到了,訊息過期了。
  • 佇列的長度限制滿了。排在前面的訊息會被丟棄或者扔到死信路由上

訊息TTL:訊息的TTL就是訊息的存活時間

RabbitMQ可以對佇列和訊息都設定過期時間,但代表的都是一個意思,只要訊息在設定時間內沒有消費,訊息就死了,就被稱為死信

如果佇列和訊息都設定了過期時間,那麼就取時間最小的,單個訊息的過期時間才是延時佇列的關鍵

2.如何運作

設定佇列過期時間

消費者P會通過一個路由鍵deal.message傳送訊息給X交換機,然後繼續傳送給delay queau佇列,這個佇列比較特殊,設定了過期時間5分鐘過期,還設定了x-dead-letter-exchange

用於指定下一個接收的交換機,訊息過期之後會成為死信直接進入delay.exchange交換機,利用x-dead-letter-routing-key繫結的路由鍵找到下一個佇列,這時候只需要有人監聽這個佇列。

設定訊息過期時間

消費者傳送一個訊息,設定了5分鐘過期時間,最後交給了延時佇列,延時佇列說訊息死了不要亂放,指定了一個死信路由,用於找到下一個佇列的路由鍵,等到五分鐘後伺服器會自動檢查是否過期,過期的話會交給delay.exchange路由,最後再交給delay.message

程式碼模擬

下訂單成功先發動給order-event-exchangeorder-event-exchange

綁定了兩個路由鍵order.create.orderorder.release.order,根據order.create.order路由鍵找到order.delay.queue佇列,這是一個特殊的佇列,上圖所訴,訊息的存活時間為一分鐘,訊息在order.delay.queue佇列中沒人使用變成死信了,交給order-event-exchange交換機,最後通過order.release.order繫結關係找到了order.release.order.queue佇列

@Configuration
public class MyMQConfig {

    //監聽最後一個佇列,獲取那些過期的訂單訊息
    @RabbitListener(queues = "order.release.order.queue")
    public  void  listerner(OrderEntity orderEntity,Channel channel,Message message) throws IOException {
        System.out.println("收到過期訂單資訊"+orderEntity.getOrderSn());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

    //特殊佇列
    @Bean
    public Queue orderDelayQueue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "order-event-exchange");
        arguments.put("x-dead-letter-routing-key", "order.release.order");
        arguments.put("x-message-ttl", 60000);
        Queue orderDelayQueue = new Queue("order.delay.queue", true, false, false, arguments);
        return orderDelayQueue;
    }

    //最後接收死信訊息的佇列
    @Bean
    public Queue orderReleaseQueue() {
        return new Queue("order.release.order.queue", true, false, false);
    }

   //事件交換機
    @Bean
    public Exchange orderEventExchange() {
        return new TopicExchange("order-event-exchange", true, false);
    }

    //繫結order.delay.queue佇列和的order-event-exchange交換機的路由鍵
    @Bean
    public Binding orderCreateBingding() {
        return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
    }

    //繫結order.release.order.queue佇列和的order-event-exchange交換機的路由鍵
    @Bean
    public Binding orderReleaseBingding() {
        return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null);
    }
 

}

測試

    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @ResponseBody
    @GetMapping("/test/createOrder")
    public String createOrderTest(){
        OrderEntity entity = new OrderEntity();
        entity.setOrderSn(UUID.randomUUID().toString());
        entity.setModifyTime(new Date());
        rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",entity);
        return "ok";
    }

庫存解鎖實際場景

在庫存服務有個stock-event-exchange交換機,如果我們想要解鎖庫存,

1、首先訂單下成功、庫存鎖定成功

2、鎖定成功就要通過stock.locked路由鍵傳送一個訊息給交換機stock-event-exchange,訊息內容包括哪個訂單、哪些商品、多少庫存等等

3、交換機通過繫結關係再發送給延時佇列stock.delay.queue

4、訂單可能需要30分鐘才會自動關閉,50分鐘之後來檢查庫存,就會知道訂單支付沒有

5、50分鐘訊息沒有被訊息,就變為死信,通過stock.release路由鍵繫結關係交給stock-event-exchange交換機

6、stock-event-exchange交換機通過stock.release路由鍵繫結關係找到strock.relelase.stock.queue佇列

7、所有的解鎖庫存服務就監聽這個佇列裡的訊息,只要這個佇列裡訊息能夠到達的都是超時沒有支付訂單的

下單遠端鎖定庫存,然後將倉庫鎖定庫存的資料發給訂單,當在訂單下單失敗時,由於不是分散式事務,訂單回滾,但倉庫不回滾,所以訂單一失敗,就需要通過訂單拿到mq中倉庫傳來的資料通知倉庫解鎖庫存

庫存解鎖場景:

1、下訂單成功,訂單過期沒有支付被系統自動取消或者使用者手動取消,都要解鎖庫存

2、下訂單成功、庫存鎖定成功,但是業務呼叫失敗導致訂單回滾,之前鎖定的庫存就自動解鎖,Seata分散式事務太慢,就要用一段時間後自動解決庫存。

3、訂單失敗,因為鎖庫存失敗有一個商品沒有鎖成功,導致整個鎖庫存服務都回滾,

訊息佇列收到庫存訊息場景

訊息佇列收到訊息之後

  • 如果沒有查到資料庫有鎖定成功的資料,說明庫存鎖失敗了,鎖庫存自動回滾,資料庫查不到記錄無需解鎖
  • 如果查到有資料,就說明庫存鎖定成功了
    • 沒有這個訂單必須解鎖庫存
    • 有訂單,訂單沒人支付失效了才能解鎖庫存

定時關閉訂單實際場景

同上原理類似也是利用死信路由,訂單建立後,預設放入延時佇列,也就是訂單的有效時間,超過這個時間沒有支付或者使用者主動取消都會導致訂單資訊進入order.release.order.queue佇列,最後被釋放