SpringBoot | 第三十八章:基於RabbitMQ實現訊息延遲佇列方案
前言
前段時間在編寫通用的訊息通知服務時,由於需要實現類似通知失敗時,需要延後幾分鐘再次進行傳送,進行多次嘗試後,進入定時傳送機制。此機制,在原先對接銀聯支付時,銀聯的非同步通知也是類似的,在第一次通知失敗後,支付標準服務會重發,最多傳送五次,每次的間隔時間為1、4、8、16分鐘等。本文就簡單講解下使用RabbitMQ實現延時訊息佇列功能。
- 前言
- 一點知識
- 延時佇列使用場景
- RabbitMQ實現延時佇列
- 存活時間(Time-To-Live 簡稱 TTL)
- 死信交換(Dead Letter Exchanges 簡稱 DLX)
- SpringBoot整合RabbitMQ實現延時佇列實戰
- 一些最佳實踐
- 參考資料
- 總結
- 最後
- 老生常談
一點知識
在此之前,簡單說明下基於RabbitMQ實現延時佇列的相關知識及說明下延時佇列的使用場景。
延時佇列使用場景
在很多的業務場景中,延時佇列可以實現很多功能,此類業務中,一般上是非實時的,需要延遲處理的,需要進行重試補償的。
- 訂單超時關閉:在支付場景中,一般上訂單在建立後30分鐘或1小時內未支付的,會自動取消訂單。
- 簡訊或者郵件通知:在一些註冊或者下單業務時,需要在1分鐘或者特定時間後進行簡訊或者郵件傳送相關資料的。本身此類業務於主業務是無關聯性的,一般上的做法是進行非同步傳送。
- 重試場景:比如訊息通知,在第一次通知出現異常時,會在隔幾分鐘之後進行再次重試傳送。
RabbitMQ實現延時佇列
本身在
RabbitMQ
中是未直接提供延時佇列功能的,但可以使用TTL(Time-To-Live,存活時間)
和DLX(Dead-Letter-Exchange,死信佇列交換機)
的特性實現延時佇列的功能。
存活時間(Time-To-Live 簡稱 TTL)
RabbitMQ
中可以對佇列和訊息分別設定TTL,TTL表明了一條訊息可在佇列中存活的最大時間。當某條訊息被設定了TTL或者當某條訊息進入了設定了TTL的佇列時,這條訊息會在TTL時間後死亡
成為Dead Letter
。如果既配置了訊息的TTL,又配置了佇列的TTL,那麼較小的那個值會被取用。
死信交換(Dead Letter Exchanges 簡稱 DLX)
上個知識點也提到了,設定了
TTL
的訊息或佇列最終會成為Dead Letter
,當訊息在一個佇列中變成死信之後,它能被重新發送到另一個交換機中,這個交換機就是DLX,繫結此DLX的佇列就是死信佇列。
一個訊息變成死信一般上是由於以下幾種情況;
- 訊息被拒絕
- 訊息過期
- 佇列達到了最大長度。
所以,通過TTL
和DLX
的特性可以模擬實現延時佇列的功能。當佇列中的訊息超時成為死信後,會把訊息死信重新發送到配置好的交換機中,然後分發到真實的消費佇列。故簡單來說,我們可以建立2個佇列,一個佇列用於傳送訊息,一個佇列用於訊息過期後的轉發的目標佇列。
SpringBoot整合RabbitMQ實現延時佇列實戰
以下使用
SpringBoot
整合RabbitMQ
進行實戰說明,在進行http
訊息通知時,若通知失敗(地址不可用或者連線超時)時,將此訊息轉入延時佇列中,待特定時間後進行重新發送。
0.引入pom依賴
<!-- rabbit -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 簡化http操作 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-http</artifactId>
<version>4.5.16</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-json</artifactId>
<version>4.5.16</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
1.編寫rabbitmq
配置檔案(關鍵配置)
RabbitConfig.java
/**
*
* @ClassName 類名:RabbitConfig
* @Description 功能說明:
* <p>
* TODO
*</p>
************************************************************************
* @date 建立日期:2019年7月17日
* @author 建立人:oKong
* @version 版本號:V1.0
*<p>
***************************修訂記錄*************************************
*
* 2019年7月17日 oKong 建立該類功能。
*
***********************************************************************
*</p>
*/
@Configuration
public class RabbitConfig {
@Autowired
ConnectionFactory connectionFactory;
/**
* 消費者執行緒數 設定大點 大概率是能通知到的
*/
@Value("${http.notify.concurrency:50}")
int concurrency;
/**
* 延遲佇列的消費者執行緒數 可設定小點
*/
@Value("${http.notify.delay.concurrency:20}")
int delayConcurrency;
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(connectionFactory);
}
@Bean
public DirectExchange httpMessageNotifyDirectExchange(RabbitAdmin rabbitAdmin) {
//durable 是否持久化
//autoDelete 是否自動刪除,即服務端或者客服端下線後 交換機自動刪除
DirectExchange directExchange = new DirectExchange(ApplicationConstant.HTTP_MESSAGE_EXCHANGE,true,false);
directExchange.setAdminsThatShouldDeclare(rabbitAdmin);
return directExchange;
}
//設定訊息佇列
@Bean
public Queue httpMessageStartQueue(RabbitAdmin rabbitAdmin) {
/*
建立接收佇列,4個引數
name - 佇列名稱
durable - false,不進行持有化
exclusive - true,獨佔性
autoDelete - true,自動刪除*/
Queue queue = new Queue(ApplicationConstant.HTTP_MESSAGE_START_QUEUE_NAME, true, false, false);
queue.setAdminsThatShouldDeclare(rabbitAdmin);
return queue;
}
//佇列繫結交換機
@Bean
public Binding bindingStartQuene(RabbitAdmin rabbitAdmin,DirectExchange httpMessageNotifyDirectExchange, Queue httpMessageStartQueue) {
Binding binding = BindingBuilder.bind(httpMessageStartQueue).to(httpMessageNotifyDirectExchange).with(ApplicationConstant.HTTP_MESSAGE_START_RK);
binding.setAdminsThatShouldDeclare(rabbitAdmin);
return binding;
}
@Bean
public Queue httpMessageOneQueue(RabbitAdmin rabbitAdmin) {
Queue queue = new Queue(ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME, true, false, false);
queue.setAdminsThatShouldDeclare(rabbitAdmin);
return queue;
}
@Bean
public Binding bindingOneQuene(RabbitAdmin rabbitAdmin,DirectExchange httpMessageNotifyDirectExchange, Queue httpMessageOneQueue) {
Binding binding = BindingBuilder.bind(httpMessageOneQueue).to(httpMessageNotifyDirectExchange).with(ApplicationConstant.HTTP_MESSAGE_ONE_RK);
binding.setAdminsThatShouldDeclare(rabbitAdmin);
return binding;
}
//-------------設定延遲佇列--開始--------------------
@Bean
public Queue httpDelayOneQueue() {
//name - 佇列名稱
//durable - true
//exclusive - false
//autoDelete - false
return QueueBuilder.durable("http.message.dlx.one")
//以下是重點:當變成死信佇列時,會轉發至 路由為x-dead-letter-exchange及x-dead-letter-routing-key的佇列中
.withArgument("x-dead-letter-exchange", ApplicationConstant.HTTP_MESSAGE_EXCHANGE)
.withArgument("x-dead-letter-routing-key", ApplicationConstant.HTTP_MESSAGE_ONE_RK)
.withArgument("x-message-ttl", 1*60*1000)//1分鐘 過期時間(單位:毫秒),當過期後 會變成死信佇列,之後進行轉發
.build();
}
//繫結到交換機上
@Bean
public Binding bindingDelayOneQuene(RabbitAdmin rabbitAdmin, DirectExchange httpMessageNotifyDirectExchange, Queue httpDelayOneQueue) {
Binding binding = BindingBuilder.bind(httpDelayOneQueue).to(httpMessageNotifyDirectExchange).with("delay.one");
binding.setAdminsThatShouldDeclare(rabbitAdmin);
return binding;
}
//-------------設定延遲佇列--結束--------------------
//建議將正常的佇列和延遲處理的佇列分開
//設定監聽容器
@Bean("notifyListenerContainer")
public SimpleRabbitListenerContainerFactory httpNotifyListenerContainer() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 手動ack
factory.setConnectionFactory(connectionFactory);
factory.setPrefetchCount(1);
factory.setConcurrentConsumers(concurrency);
return factory;
}
// 設定監聽容器
@Bean("delayNotifyListenerContainer")
public SimpleRabbitListenerContainerFactory httpDelayNotifyListenerContainer() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 手動ack
factory.setConnectionFactory(connectionFactory);
factory.setPrefetchCount(1);
factory.setConcurrentConsumers(delayConcurrency);
return factory;
}
}
ApplicationConstant.java
public class ApplicationConstant {
/**
* 傳送http通知的 exchange 佇列
*/
public static final String HTTP_MESSAGE_EXCHANGE = "http.message.exchange";
/**
* 配置訊息佇列和路由key值
*/
public static final String HTTP_MESSAGE_START_QUEUE_NAME = "http.message.start";
public static final String HTTP_MESSAGE_START_RK = "rk.start";
public static final String HTTP_MESSAGE_ONE_QUEUE_NAME = "http.message.one";
public static final String HTTP_MESSAGE_ONE_RK = "rk.one";
/**
* 通知佇列對應的延遲佇列關係,即過期佇列之後傳送到下一個的佇列資訊,可以根據實際情況新增,當然也可以根據一定規則自動生成
*/
public static final Map<String,String> delayRefMap = new HashMap<String, String>() {
/**
*
*/
private static final long serialVersionUID = -779823216035682493L;
{
put(HTTP_MESSAGE_START_QUEUE_NAME, "delay.one");
}
};
}
簡單來說,就是建立一個正常訊息傳送佇列,用於接收http訊息請求的引數,同時進行http請求。同時,建立一個延時佇列,設定其x-dead-letter-exchange
、x-dead-letter-routing-key
和x-message-ttl
值,將其轉發到正常的佇列中。使用一個map物件維護一個關係,當正常訊息異常時,需要傳送的延時佇列的佇列名稱,當然時間場景彙總,根據需要可以進行動態配置或者根據一定規則進行動態對映。
2.建立監聽類,用於訊息的消費操作,此處使用@RabbitListener
來消費訊息(當然也可以使用SimpleMessageListenerContainer
進行訊息配置的),建立了一個正常訊息監聽和延時佇列監聽,由於一般上異常通知是低概率事件,可根據不同的監聽容器進行差異化配置。
/**
*
* @ClassName 類名:HttpMessagerLister
* @Description 功能說明:http通知消費監聽介面
* <p>
* TODO
*</p>
************************************************************************
* @date 建立日期:2019年7月17日
* @author 建立人:oKong
* @version 版本號:V1.0
*<p>
***************************修訂記錄*************************************
*
* 2019年7月17日 oKong 建立該類功能。
*
***********************************************************************
*</p>
*/
@Component
@Slf4j
public class HttpMessagerLister {
@Autowired
HttpMessagerService messagerService;
@RabbitListener(id = "httpMessageNotifyConsumer", queues = {ApplicationConstant.HTTP_MESSAGE_START_QUEUE_NAME}, containerFactory = "notifyListenerContainer")
public void httpMessageNotifyConsumer(Message message, Channel channel) throws Exception {
doHandler(message, channel);
}
@RabbitListener(id= "httpDelayMessageNotifyConsumer", queues = {
ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME,}, containerFactory = "delayNotifyListenerContainer")
public void httpDelayMessageNotifyConsumer(Message message, Channel channel) throws Exception {
doHandler(message, channel);
}
private void doHandler(Message message, Channel channel) throws Exception {
String body = new String(message.getBody(),"utf-8");
String queue = message.getMessageProperties().getConsumerQueue();
log.info("接收到通知請求:{},佇列名:{}",body, queue);
//訊息物件轉換
try {
HttpEntity httpNotifyDto = JSONUtil.toBean(body, HttpEntity.class);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//傳送通知
messagerService.notify(queue, httpNotifyDto);
} catch(Exception e) {
log.error(e.getMessage());
//ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
HttpMessagerService.java
:訊息真正處理的類,此類是關鍵,這裡未進行日誌記錄,真實場景中,強烈建議進行訊息通知的日誌儲存,防止日後資訊的檢視,同時也能通過傳送狀態,在重試次數都失敗後,進行定時再次傳送功能,同時也有據可查。
@Component
@Slf4j
public class HttpMessagerService {
@Autowired
AmqpTemplate mqTemplate;
public void notify(String queue,HttpEntity httpEntity) {
//發起請求
log.info("開始發起http請求:{}", httpEntity);
try {
switch(httpEntity.getMethod().toLowerCase()) {
case "POST":
HttpUtil.post(httpEntity.getUrl(), httpEntity.getParams());
break;
case "GET":
default:
HttpUtil.get(httpEntity.getUrl(), httpEntity.getParams());
}
} catch (Exception e) {
//發生異常,放入延遲佇列中
String nextRk = ApplicationConstant.delayRefMap.get(queue);
if(ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME.equals(queue)) {
//若已經是最後一個延遲佇列的訊息隊列了,則後續可直接放入資料庫中 待後續定時策略進行再次傳送
log.warn("http通知已經通知N次失敗,進入定時進行發起通知,url={}", httpEntity.getUrl());
} else {
log.warn("http重新發送通知:{}, 通知佇列rk為:{}, 原佇列:{}", httpEntity.getUrl(), nextRk, queue);
mqTemplate.convertAndSend(ApplicationConstant.HTTP_MESSAGE_EXCHANGE, nextRk, cn.hutool.json.JSONUtil.toJsonStr(httpEntity));
}
}
}
}
3.建立控制層服務(真實場景中,如SpringCloud
微服務中,一般上是建立個api介面,供其他服務進行呼叫)
@Slf4j
@RestController
@Api(tags = "http測試介面")
public class HttpDemoController {
@Autowired
AmqpTemplate mqTemplate;
@PostMapping("/send")
@ApiOperation(value="send",notes = "傳送http測試")
public String sendHttp(@RequestBody HttpEntity httpEntity) {
//傳送http請求
log.info("開始發起http請求,釋出非同步訊息:{}", httpEntity);
mqTemplate.convertAndSend(ApplicationConstant.HTTP_MESSAGE_EXCHANGE, ApplicationConstant.HTTP_MESSAGE_START_RK, cn.hutool.json.JSONUtil.toJsonStr(httpEntity));
return "傳送成功:url=" + httpEntity.getUrl();
}
}
4.配置檔案新增RabbitMQ
相關配置資訊
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 通知-消費者執行緒數 設定大點 大概率是能通知到的
http.notify.concurrency=150
# 延遲佇列的消費者執行緒數 可設定小點
http.notify.delay.concurrency=10
5.編寫啟動類。
@SpringBootApplication
@Slf4j
public class DelayQueueApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(DelayQueueApplication.class, args);
log.info("spring-boot-rabbitmq-delay-queue-chapter38服務啟動!");
}
}
6.啟動服務。使用swagger
進行簡單呼叫測試。
- 正常通知:
2019-07-20 23:52:23.792 INFO 65216 --- [nio-8080-exec-1] c.l.l.s.c.controller.HttpDemoController : 開始發起http請求,釋出非同步訊息:HttpEntity(url=www.baidu.com, params={a=1}, method=get)
2019-07-20 23:52:23.794 INFO 65216 --- [TaskExecutor-97] c.l.l.s.chapter38.mq.HttpMessagerLister : 接收到通知請求:{"method":"get","params":{"a":1},"url":"www.baidu.com"},佇列名:http.message.start
2019-07-20 23:52:23.794 INFO 65216 --- [TaskExecutor-97] c.l.l.s.c.service.HttpMessagerService : 開始發起http請求:HttpEntity(url=www.baidu.com, params={a=1}, method=get)
- 異常通知:訪問一個不存在的地址
2019-07-20 23:53:14.699 INFO 65216 --- [nio-8080-exec-4] c.l.l.s.c.controller.HttpDemoController : 開始發起http請求,釋出非同步訊息:HttpEntity(url=www.baidu.com1, params={a=1}, method=get)
2019-07-20 23:53:14.705 INFO 65216 --- [TaskExecutor-84] c.l.l.s.chapter38.mq.HttpMessagerLister : 接收到通知請求:{"method":"get","params":{"a":1},"url":"www.baidu.com1"},佇列名:http.message.start
2019-07-20 23:53:14.705 INFO 65216 --- [TaskExecutor-84] c.l.l.s.c.service.HttpMessagerService : 開始發起http請求:HttpEntity(url=www.baidu.com1, params={a=1}, method=get)
2019-07-20 23:53:14.706 WARN 65216 --- [TaskExecutor-84] c.l.l.s.c.service.HttpMessagerService : http重新發送通知:www.baidu.com1, 通知佇列rk為:delay.one, 原佇列:http.message.start
在RabbitMQ
後臺中,可以看見http.message.dlx.one
佇列中存在這需要延時處理的訊息,在一分鐘後會轉發至http.message.one
佇列中。
在一分鐘後,可以看見訊息本再次消費了。
2019-07-20 23:54:14.722 INFO 65216 --- [TaskExecutor-16] c.l.l.s.chapter38.mq.HttpMessagerLister : 接收到通知請求:{"method":"get","params":{"a":1},"url":"www.baidu.com1"},佇列名:http.message.one
2019-07-20 23:54:14.723 INFO 65216 --- [TaskExecutor-16] c.l.l.s.c.service.HttpMessagerService : 開始發起http請求:HttpEntity(url=www.baidu.com1, params={a=1}, method=get)
2019-07-20 23:54:14.723 WARN 65216 --- [TaskExecutor-16] c.l.l.s.c.service.HttpMessagerService : http通知已經通知N次失敗,進入定時進行發起通知,url=www.baidu.com1
一些最佳實踐
在正式場景中,一般上補償或者重試機制大概率是不會發送的,倘若發生時,一般上是第三方業務系統出現了問題,故一般上在進行補充時,應該在非高峰期進行操作,故應該對延時監聽器,應該在高峰期時停止消費,在非高峰期時進行消費。同時,還可以根據不同的通知型別,放入不一樣的延時佇列中,保障業務的正常。這裡簡單說明下,動態停止或者啟動演示監聽器的方式。一般上是使用
RabbitListenerEndpointRegistry
物件獲取延時監聽器,之後進行動態停止或者啟用。可設定@RabbitListener
的id屬性,直接進行獲取,當然也可以直接獲取所有的監聽器,進行自定義判斷了。
@Autowired
RabbitListenerEndpointRegistry registry;
@GetMapping("/set")
@ApiOperation(value = "set", notes = "設定訊息監聽器的狀態")
public String setSimpleMessageListenerContainer(String status) {
if("1".equals(status)) {
registry.getListenerContainer("httpDelayMessageNotifyConsumer").start();
} else {
registry.getListenerContainer("httpDelayMessageNotifyConsumer").stop();
}
return status;
}
這裡,只是簡單進行演示說明,在真實場景下,可以使用定時器,判斷當前是否為高峰期,進而進行動態設定監聽器的狀態。
參考資料
- https://www.rabbitmq.com/admin-guide.html
- https://www.rabbitmq.com/ttl.html
總結
本文主要簡單介紹了基於
RabbitMQ
實現延時佇列的功能。對於需要實現更加靈活的配置及功能時,如可自定義配置通知次數等,大家可根據自己的需求進行新增,可以使用動態建立佇列的方式。當然使用延時佇列的方式還有很多,比如可以使用redis
的key值過期回撥機制使用,也可以使用定時機制。另,發現好久沒有寫文章了,感覺寫的有點亂,還望見諒呀~
最後
目前網際網路上很多大佬都有
SpringBoot
系列教程,如有雷同,請多多包涵了。原創不易,碼字不易,還希望大家多多支援。若文中有所錯誤之處,還望提出,謝謝。
老生常談
- 個人QQ:
499452441
- 微信公眾號:
lqdevOps
個人部落格:http://blog.lqdev.cn
完整示例:基於RabbitMQ實現訊息延遲佇列方案
原文地址:https://blog.lqdev.cn/2019/07/21/springboot/chapter-thirty-eig