使用RabbitMQ實現延時任務
一、場景
查詢支付結果。由於支付系統的複雜性,客戶支付後錢款可能無法實時到賬。此時就需要延時任務輪詢查詢支付結果。
類似此類無法直接實時獲取結果的場景下,都可以使用延時任務完成結果狀態的查詢。
二、方案
普通交換器+死信交換器。根據延時需求設定訊息過期時間,訊息過期進入死信佇列,消費者監聽死信佇列,實現延時任務。
三、概念
要理解普通交換器+私信交換器實現延時任務的原理,首先要了解RabbitMQ的訊息存活時間TTL、死信交換機的概念。
1.訊息存活時間Time to Live(TTL)
RabbitMQ可以對佇列和訊息分別設定TTL。對佇列設定就是佇列沒有消費者連線的保留時間,也可以對每一個單獨的訊息做單獨的設定。如果佇列和訊息都設定了TTL,那麼會取較小的時間。
當佇列中的訊息存留時間超過了配置的生存時間(TTL),則稱該訊息已死亡。注意,同一個訊息被路由到不同的佇列將擁有不同的過期時間,又或者永遠不會過期。這取決於訊息所存在的佇列。一個佇列中的死亡訊息不會影響到其他佇列中與之相同訊息的生命週期。
所以一個訊息如果被路由到不同的佇列中,這個訊息死亡的時間有可能不一樣(不同的佇列設定)。這裡單講單個訊息的TTL,因為它才是實現延遲任務的關鍵。
可以通過訊息的expiration過期時間欄位設定訊息的TTL
1 rabbitTemplate.convertAndSend(delayExchangeName, delayRoutingKeyName, reqDTO, message -> {2 // 設定過期時間為期望的延時時間,到期後進入死信佇列,實現延時任務 3 message.getMessageProperties().setExpiration("10000"); 4 return message; 5 });
上面這個訊息設定了過期時間為10000ms,所以10秒後如果沒有被消費,則該訊息已死亡,即“死信”。單靠死信還無法完成延時任務,還需要死信交換機Dead Letter Exchanges.
2. 死信交換機Dead Letter Exchanges
佇列中的訊息可能會變成死信訊息(dead-lettered),進而當以下幾個事件任意一個發生時,訊息將會被重新發送到一個交換機:
- 訊息被消費者使用basic.reject或basic.nack方法並且requeue引數值設定為false的方式進行訊息確認(negatively acknowledged)
- 訊息由於訊息有效期(per-message TTL)過期
- 訊息由於佇列超過其長度限制而被丟棄
注意,佇列的有效期並不會導致其中的訊息過期。
死信交換機(DLXs)就是普通的交換機,可以是任何一種型別,也可以用普通常用的方式進行宣告。
給佇列設定死信交換機時,可以在宣告佇列時使用可選引數"x-dead-letter-exchange"進行宣告配置。該引數值必須是與佇列在同一個虛擬主機的交換機名稱。注意,並不要求在宣告佇列時死信交換機必須已經被宣告,但是當訊息需要死信路由時,該交換機必須存在,否則,訊息將會被丟棄。
也可以指定一個路由關鍵字在死信路由時使用,如果沒有設定,那麼就會使用訊息自身原來的路由關鍵字。
1 /** 2 * 延時佇列 3 * @return 4 */ 5 @Bean 6 public Queue delayQueue() { 7 Map<String, Object> params = new HashMap<>(); 8 params.put("x-dead-letter-exchange", exchangeName); 9 params.put("x-dead-letter-routing-key", routingKeyName); 10 return new Queue(delayQueueName, true, false, false, params); 11 }
上述程式碼通過兩個可選引數給佇列delayQueueName設定了死信交換機和死信路由鍵,也就是說,當佇列delayQueueName發生死信時,將會根據配置的死信交換機和死信路由鍵把死信轉發到目標佇列中。消費者監聽這個死信佇列,即可實現延時任務。
四、程式碼實現
1. 這段示例程式碼實現的是延時喚醒操作。生產者傳送喚醒訊息,消費者監聽死信佇列中的喚醒訊息,如果喚醒次數小於6,就再次傳送喚醒訊息,等待下次喚醒。當喚醒次數為4時,消費者被喚醒,不再發送訊息。
2. 程式碼結構:
- SendWakeUpMsgReqDTO:傳送訊息的引數
- RabbitMqConfig:MQ配置
- RabbitMqProviderImpl:訊息生產者
- RabbitMqConsumerImpl:訊息消費者
- AsyncTaskServiceImpl:非同步呼叫 傳送訊息
SendWakeUpMsgReqDTO:
1 import lombok.Data; 2 3 import java.io.Serializable; 4 5 /** 6 * @Author Nemo Wang 7 * @Date 2021/6/17 19:46 8 * @Description 傳送訊息的引數 9 */ 10 @Data 11 public class SendWakeUpMsgReqDTO implements Serializable { 12 private static final long serialVersionUID = 6298050708365621926L; 13 14 /** 15 * 來源 16 */ 17 private String sourceName; 18 19 /** 20 * 傳送次數 21 */ 22 private int checkTimes; 23 24 /** 25 * 延時時間 26 */ 27 private String delayTime; 28 29 }傳送訊息的引數
RabbitMqConfig:
1 /** 2 * @Author Nemo Wang 3 * @Date 2021/6/17 17:40 4 * @Description RabbitMQ配置類 5 */ 6 @Configuration 7 public class RabbitMqConfig { 8 9 @Value("${mq.delayQueueName}") 10 private String delayQueueName; 11 @Value("${mq.delayExchangeName}") 12 private String delayExchangeName; 13 @Value("${mq.delayRoutingKeyName}") 14 private String delayRoutingKeyName; 15 16 @Value("${mq.queueName}") 17 private String queueName; 18 @Value("${mq.exchangeName}") 19 private String exchangeName; 20 @Value("${mq.routingKeyName}") 21 private String routingKeyName; 22 23 /** 24 * 延時佇列 25 * @return 26 */ 27 @Bean 28 public Queue delayQueue() { 29 Map<String, Object> params = new HashMap<>(); 30 params.put("x-dead-letter-exchange", exchangeName); 31 params.put("x-dead-letter-routing-key", routingKeyName); 32 return new Queue(delayQueueName, true, false, false, params); 33 } 34 35 /** 36 * 普通執行佇列 37 * @return 38 */ 39 @Bean 40 public Queue processQueue() { 41 return new Queue(queueName, true); 42 } 43 44 /** 45 * 延時交換機 46 * @return 47 */ 48 @Bean 49 public DirectExchange delayEchange() { 50 return new DirectExchange(delayExchangeName, true, false); 51 } 52 /** 53 * 普通執行交換機 54 * @return 55 */ 56 @Bean 57 public DirectExchange processEchange() { 58 return new DirectExchange(exchangeName, true, false); 59 } 60 61 /** 62 * 繫結 將延時佇列和延時交換機繫結, 並設定用於匹配鍵:delayRoutingKeyName 63 * @return 64 */ 65 @Bean 66 public Binding delayBinding() { 67 return BindingBuilder.bind(delayQueue()) 68 .to(delayEchange()) 69 .with(delayRoutingKeyName); 70 } 71 72 /** 73 * 普通佇列和普通交換機繫結 並設定用於匹配鍵routingKeyName 74 * @return 75 */ 76 @Bean 77 public Binding processBinding() { 78 return BindingBuilder.bind(processQueue()) 79 .to(processEchange()) 80 .with(routingKeyName); 81 } 82 }RabbitMQ配置類
RabbitMqProviderImpl:
1 /** 2 * @Author Nemo Wang 3 * @Date 2021/6/17 19:52 4 * @Description 訊息生產者 5 */ 6 @Slf4j 7 @Component 8 public class RabbitMqProviderImpl implements RabbitMqProvider { 9 10 @Value("${mq.delayExchangeName}") 11 private String delayExchangeName; 12 @Value("${mq.delayRoutingKeyName}") 13 private String delayRoutingKeyName; 14 15 @Autowired 16 private RabbitTemplate rabbitTemplate; 17 18 /** 19 * 傳送喚醒訊息 20 * @param reqDTO 21 */ 22 @Override 23 public void sendWakeUpMsg(SendWakeUpMsgReqDTO reqDTO) { 24 log.info("Enter RabbmitMqProviderImpl.sendWakeUpMsg reqDTO={}", reqDTO); 25 rabbitTemplate.convertAndSend(delayExchangeName, delayRoutingKeyName, reqDTO, message -> { 26 // 設定過期時間為期望的延時時間,到期後進入死信佇列,實現延時任務 27 message.getMessageProperties().setExpiration(reqDTO.getDelayTime()); 28 return message; 29 }); 30 } 31 }訊息生產者
RabbitMqConsumerImpl:
1 /** 2 * @Author Nemo Wang 3 * @Date 2021/6/17 22:17 4 * @Description 訊息消費者 5 */ 6 @Slf4j 7 @Component 8 public class RabbitMqConsumerImpl implements RabbitMqConsumer { 9 10 @Autowired 11 private AsyncTaskService asyncTaskService; 12 13 /** 14 * 監聽死信佇列中的訊息 15 * 延時任務佇列中的訊息到期後進入死信佇列。此處監聽死信佇列訊息,實現延時任務 16 * 每次監聽到訊息時,調起該方法。判斷cnt是否為4,如果==4,喚醒返回;如果!=4,繼續進入mq佇列,等待下次喚醒 17 * @param reqDTO 18 */ 19 @RabbitListener(queues = "${mq.queueName}") 20 @Override 21 public void waitingForWakeUp(SendWakeUpMsgReqDTO reqDTO) { 22 log.info("Enter RabbitMqConsumerImpl.waitingForWakeUp reqDTO={}", reqDTO); 23 24 log.info("Consumer 正在睡眠:[{}]. 等待被喚醒", reqDTO.getCheckTimes()); 25 26 if (4 == reqDTO.getCheckTimes()) { 27 log.info("Consumer 已被喚醒."); 28 return; 29 } 30 31 if (reqDTO.getCheckTimes() < 6) { 32 // 再次進入mq佇列 最多喚醒6次 33 sendMsg(reqDTO); 34 } 35 } 36 37 private void sendMsg(SendWakeUpMsgReqDTO reqDTO) { 38 // 進入mq佇列次數 39 reqDTO.setCheckTimes(reqDTO.getCheckTimes() + 1); 40 // 設定延時 5秒 41 reqDTO.setDelayTime("5000"); 42 log.info("RabbitMqConsumerImpl.waitingForWakeUp.sendMsg 再次進入mq佇列 reqDTO.getCheckTimes()={}", reqDTO.getCheckTimes()); 43 asyncTaskService.sendAsyncMqWakeup(reqDTO); 44 } 45 }訊息消費者
AsyncTaskServiceImpl:
1 /** 2 * @Author Nemo Wang 3 * @Date 2021/6/19 19:22 4 * @Description 非同步呼叫 傳送訊息 5 */ 6 @Slf4j 7 @Component 8 public class AsyncTaskServiceImpl implements AsyncTaskService { 9 10 @Autowired 11 private RabbitMqProvider rabbitMqProvider; 12 13 @Async("asyncThreadPoolTaskExecutor") 14 @Override 15 public void sendAsyncMqWakeup(SendWakeUpMsgReqDTO reqDTO) { 16 log.info("AsyncTaskServiceImpl.sendAsyncMqWakeup reqDTO={}", reqDTO); 17 rabbitMqProvider.sendWakeUpMsg(reqDTO); 18 } 19 }非同步呼叫 傳送訊息