RabbitMQ實現支付訂單延遲訊息
阿新 • • 發佈:2021-02-15
技術標籤:spring boot架構設計
1. RabbitMQ
RabbitMQ是一個被廣泛使用的開源訊息佇列。它是輕量級且易於部署的,它能支援多種訊息協議。RabbitMQ可以部署在分散式和聯合配置中,以滿足高規模、高可用性的需求。
2. RabbitMQ機制
3. 死信佇列
“死信”是RabbitMQ中的一種訊息機制,當你在消費訊息時,如果佇列裡的訊息出現以下情況:
- 訊息被否定確認,使用 channel.basicNack 或 channel.basicReject ,並且此時requeue 屬性被設定為false。
- 訊息在佇列的存活時間超過設定的TTL時間。
- 訊息佇列的訊息數量已經超過最大佇列長度。
4 應用實戰
業務場景說明
用於解決使用者下單以後,訂單超時如何取消訂單的問題。
- 使用者進行下單操作(會有鎖定商品庫存、使用優惠券、積分一系列的操作);
- 生成訂單,獲取訂單的id;
- 獲取到設定的訂單超時時間(假設設定的為60分鐘不支付取消訂單);
- 按訂單超時時間傳送一個延遲訊息給RabbitMQ,讓它在訂單超時後觸發取消訂單的操作;
- 如果使用者沒有支付,進行取消訂單操作(釋放鎖定商品庫存、返還優惠券、返回積分一系列操作)。
pom.xml依賴
<!--訊息佇列相關依賴-- >
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitMQ的配置
- 用於配置交換機、佇列及佇列與交換機的繫結關係。
@Configuration
public class RabbitMqConfig {
/**
* 訂單訊息實際消費佇列所繫結的交換機
*/
@Bean
DirectExchange orderDirect () {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}
/**
* 訂單延遲佇列佇列所繫結的交換機
*/
@Bean
DirectExchange orderTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}
/**
* 訂單實際消費佇列
*/
@Bean
public Queue orderQueue() {
return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
}
/**
* 訂單延遲佇列(死信佇列)
*/
@Bean
public Queue orderTtlQueue() {
return QueueBuilder
.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
.withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期後轉發的交換機
.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期後轉發的路由鍵
.build();
}
/**
* 將訂單佇列繫結到交換機
*/
@Bean
Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){
return BindingBuilder
.bind(orderQueue)
.to(orderDirect)
.with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
}
/**
* 將訂單延遲佇列繫結到交換機
*/
@Bean
Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
return BindingBuilder
.bind(orderTtlQueue)
.to(orderTtlDirect)
.with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
}
}
- mall.order.direct(取消訂單訊息佇列所繫結的交換機):繫結的佇列為mall.order.cancel,一旦有訊息以mall.order.cancel為路由鍵發過來,會發送到此佇列。
- mall.order.direct.ttl(訂單延遲訊息佇列所繫結的交換機):繫結的佇列為mall.order.cancel.ttl,一旦有訊息以mall.order.cancel.ttl為路由鍵傳送過來,會轉發到此佇列,並在此佇列儲存一定時間,等到超時後會自動將訊息傳送到mall.order.cancel(取消訂單訊息消費佇列)。
延遲訊息的傳送者
- 用於向訂單延遲訊息佇列(mall.order.cancel.ttl)裡傳送訊息
@Component
public class CancelOrderSender {
private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMessage(Long orderId,final long delayTimes){
//給延遲佇列傳送訊息
amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//給訊息設定延遲毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
}
});
LOGGER.info("send delay message orderId:{}",orderId);
}
}
取消訂單訊息的接收者
- 用於從取消訂單的訊息佇列(mall.order.cancel)裡接收訊息。
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {
private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
@Autowired
private OmsPortalOrderService portalOrderService;
@RabbitHandler
public void handle(Long orderId){
LOGGER.info("receive delay message orderId:{}",orderId);
portalOrderService.cancelOrder(orderId);
}
}
業務類
- service
/**
* 前臺訂單管理Service
* Created by macro on 2018/8/30.
*/
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);
@Autowired
private CancelOrderSender cancelOrderSender;
@Override
public CommonResult generateOrder(OrderParam orderParam) {
//todo 執行一系類下單操作,具體參考mall專案
LOGGER.info("process generateOrder");
//下單完成後開啟一個延遲訊息,用於當用戶沒有付款時取消訂單(orderId應該在下單後生成)
sendDelayMessageCancelOrder(11L);
return CommonResult.success(null, "下單成功");
}
@Override
public void cancelOrder(Long orderId) {
//todo 執行一系類取消訂單操作,具體參考mall專案
LOGGER.info("process cancelOrder orderId:{}",orderId);
}
private void sendDelayMessageCancelOrder(Long orderId) {
//獲取訂單超時時間,假設為60分鐘
long delayTimes = 30 * 1000;
//傳送延遲訊息
cancelOrderSender.sendMessage(orderId, delayTimes);
}
}
- controller
@Controller
@Api(tags = "OmsPortalOrderController", description = "訂單管理")
@RequestMapping("/order")
public class OmsPortalOrderController {
@Autowired
private OmsPortalOrderService portalOrderService;
@ApiOperation("根據購物車資訊生成訂單")
@RequestMapping(value = "/generateOrder", method = RequestMethod.POST)
@ResponseBody
public Object generateOrder(@RequestBody OrderParam orderParam) {
return portalOrderService.generateOrder(orderParam);
}
}