1. 程式人生 > 其它 >RabbitMQ實現支付訂單延遲訊息

RabbitMQ實現支付訂單延遲訊息

技術標籤:spring boot架構設計

1. RabbitMQ

RabbitMQ是一個被廣泛使用的開源訊息佇列。它是輕量級且易於部署的,它能支援多種訊息協議。RabbitMQ可以部署在分散式和聯合配置中,以滿足高規模、高可用性的需求。

2. RabbitMQ機制

rabbitMQ機制

3. 死信佇列

“死信”是RabbitMQ中的一種訊息機制,當你在消費訊息時,如果佇列裡的訊息出現以下情況:

  • 訊息被否定確認,使用 channel.basicNack 或 channel.basicReject ,並且此時requeue 屬性被設定為false。
  • 訊息在佇列的存活時間超過設定的TTL時間。
  • 訊息佇列的訊息數量已經超過最大佇列長度。
    “死信”訊息會被RabbitMQ進行特殊處理,如果配置了死信佇列資訊,那麼該訊息將會被丟進死信佇列中,如果沒有配置,則該訊息將會被丟棄。

4 應用實戰

業務場景說明

用於解決使用者下單以後,訂單超時如何取消訂單的問題。

  • 使用者進行下單操作(會有鎖定商品庫存、使用優惠券、積分一系列的操作);
  • 生成訂單,獲取訂單的id;
  • 獲取到設定的訂單超時時間(假設設定的為60分鐘不支付取消訂單);
  • 按訂單超時時間傳送一個延遲訊息給RabbitMQ,讓它在訂單超時後觸發取消訂單的操作;
  • 如果使用者沒有支付,進行取消訂單操作(釋放鎖定商品庫存、返還優惠券、返回積分一系列操作)。

pom.xml依賴

<!--訊息佇列相關依賴--
> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

RabbitMQ的配置

  • 用於配置交換機、佇列及佇列與交換機的繫結關係。
@Configuration
public class RabbitMqConfig {

    /**
     * 訂單訊息實際消費佇列所繫結的交換機
     */
    @Bean
    DirectExchange orderDirect
() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 訂單延遲佇列佇列所繫結的交換機 */ @Bean DirectExchange orderTtlDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 訂單實際消費佇列 */ @Bean public Queue orderQueue() { return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName()); } /** * 訂單延遲佇列(死信佇列) */ @Bean public Queue orderTtlQueue() { return QueueBuilder .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()) .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期後轉發的交換機 .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期後轉發的路由鍵 .build(); } /** * 將訂單佇列繫結到交換機 */ @Bean Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){ return BindingBuilder .bind(orderQueue) .to(orderDirect) .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()); } /** * 將訂單延遲佇列繫結到交換機 */ @Bean Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){ return BindingBuilder .bind(orderTtlQueue) .to(orderTtlDirect) .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey()); } }
  • mall.order.direct(取消訂單訊息佇列所繫結的交換機):繫結的佇列為mall.order.cancel,一旦有訊息以mall.order.cancel為路由鍵發過來,會發送到此佇列。
  • mall.order.direct.ttl(訂單延遲訊息佇列所繫結的交換機):繫結的佇列為mall.order.cancel.ttl,一旦有訊息以mall.order.cancel.ttl為路由鍵傳送過來,會轉發到此佇列,並在此佇列儲存一定時間,等到超時後會自動將訊息傳送到mall.order.cancel(取消訂單訊息消費佇列)。

延遲訊息的傳送者

  • 用於向訂單延遲訊息佇列(mall.order.cancel.ttl)裡傳送訊息
@Component
public class CancelOrderSender {
    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Long orderId,final long delayTimes){
        //給延遲佇列傳送訊息
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //給訊息設定延遲毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            }
        });
        LOGGER.info("send delay message orderId:{}",orderId);
    }
}

取消訂單訊息的接收者

  • 用於從取消訂單的訊息佇列(mall.order.cancel)裡接收訊息。
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {
    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
    @Autowired
    private OmsPortalOrderService portalOrderService;
    @RabbitHandler
    public void handle(Long orderId){
        LOGGER.info("receive delay message orderId:{}",orderId);
        portalOrderService.cancelOrder(orderId);
    }
}

業務類

  • service
/**
 * 前臺訂單管理Service
 * Created by macro on 2018/8/30.
 */
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
    private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);
    @Autowired
    private CancelOrderSender cancelOrderSender;

    @Override
    public CommonResult generateOrder(OrderParam orderParam) {
        //todo 執行一系類下單操作,具體參考mall專案
        LOGGER.info("process generateOrder");
        //下單完成後開啟一個延遲訊息,用於當用戶沒有付款時取消訂單(orderId應該在下單後生成)
        sendDelayMessageCancelOrder(11L);
        return CommonResult.success(null, "下單成功");
    }

    @Override
    public void cancelOrder(Long orderId) {
        //todo 執行一系類取消訂單操作,具體參考mall專案
        LOGGER.info("process cancelOrder orderId:{}",orderId);
    }

    private void sendDelayMessageCancelOrder(Long orderId) {
        //獲取訂單超時時間,假設為60分鐘
        long delayTimes = 30 * 1000;
        //傳送延遲訊息
        cancelOrderSender.sendMessage(orderId, delayTimes);
    }

}
  • controller
@Controller
@Api(tags = "OmsPortalOrderController", description = "訂單管理")
@RequestMapping("/order")
public class OmsPortalOrderController {
    @Autowired
    private OmsPortalOrderService portalOrderService;

    @ApiOperation("根據購物車資訊生成訂單")
    @RequestMapping(value = "/generateOrder", method = RequestMethod.POST)
    @ResponseBody
    public Object generateOrder(@RequestBody OrderParam orderParam) {
        return portalOrderService.generateOrder(orderParam);
    }
}