RabbitMQ之訊息生產消費
前言
前面學習了RabbitMQ的使用場景和基礎概念,現在來用一個RabbitMQ案例,來正式學習一下MQ的使用。
這個案例分為兩個部分,訊息生產者和訊息消費者。模擬使用者註冊場景,生產者將使用者手機號傳送到MQ,消費者監聽MQ佇列,獲取使用者手機號傳送簡訊。
具體實現功能如下:
- 使用RabbitMQ客戶端訊息confirm和redis,確保訊息正確傳送到MQ,不會產生丟失;
- 結合訊息過期(TTL)和死信佇列(DLX)實現消費異常的延遲重試;
- 訊息處理失敗達到最大重試次數之後,將其傳送到失敗佇列,等待人工處理。
正文
專案版本:Springboot2.0.4、RabbitMQ3.7.7
專案具體流程如下:
具體程式碼
1、定義一個訊息交換機和三個訊息佇列。
@Slf4j
@Configuration
public class RabbitConfig {
/**
* 宣告一個交換機
* @return
*/
@Bean
public TopicExchange smsCaptchaExchange() {
return new TopicExchange("sms_captcha", true, false);
}
/**
* 正常的消費佇列
* @return
*/
@Bean
public Queue smsCaptchaQueue() {
return new Queue(" [email protected]", true, false, false);
}
/**
* 延時重試佇列
*/
@Bean
public Queue smsCaptchaRetryQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 10 * 1000);
arguments.put("x-dead-letter-exchange", "sms_captcha");
arguments.put("x-dead-letter-routing-key", "sms.captcha");
return new Queue(" [email protected]@retry", true, false, false, arguments);
}
/**
* 處理失敗後存放訊息的佇列
* @return
*/
@Bean
public Queue smsCaptchaFailedQueue() {
return new Queue("[email protected]@failed", true, false, false);
}
/**
* 將佇列與交換機繫結
* @return
*/
@Bean
public Binding smsCaptchaBinding() {
return BindingBuilder.bind(smsCaptchaQueue()).to(smsCaptchaExchange()).with("sms.captcha");
}
@Bean
public Binding smsCaptchaRetryBinding() {
return BindingBuilder.bind(smsCaptchaRetryQueue()).to(smsCaptchaExchange()).with("sms.captcha.retry");
}
@Bean
public Binding smsCaptchaFailedBinding() {
return BindingBuilder.bind(smsCaptchaFailedQueue()).to(smsCaptchaExchange()).with("sms.captcha.failed");
}
}
這裡為了省事,只定義了一個交換機,這和三個交換機沒有區別。三個佇列分別為正常消費佇列、延遲重試佇列和訊息處理失敗後的存放佇列。
2、使用者請求註冊,並將註冊資訊包裝為message,傳送到MQ佇列中。
@PostMapping(value = "register")
public User register(@RequestBody User user) {
String userJson = JSONObject.toJSONString(user);
log.info("使用者註冊,註冊資訊:{}", userJson);
String msgId = RandomUtil.getRandomUUID(20);
Message message = MessageBuilder.withBody(userJson.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setCorrelationId(msgId)
.build();
CorrelationData correlationData = new CorrelationData(msgId);
/**
* 將msgId與Message的關係儲存起來,放到快取中.
*/
redisTemplate.opsForHash().put("message", msgId, message);
redisTemplate.opsForHash().put("exchange", msgId, "sms_captcha");
redisTemplate.opsForHash().put("routingKey", msgId, "sms.captcha");
//TODO 有些時候,由於某些原因傳送端接收不到MQ的confirm,這條訊息可能已經丟了,需要重新發送。
//TODO 還需要定期查詢快取,檢視訊息的傳送時間距現在是否已經超過了一定時間,超過了這個時間但是訊息還存在快取中,則訊息很有可能已經丟了,傳送端沒有收到confirm,需要重新發送。
//TODO 需要快取的資料有點雜,redis這麼設計好嗎?
rabbitTemplate.convertAndSend("sms_captcha", "sms.captcha", message, correlationData);
return user;
}
快取訊息
可以看到上面程式碼中用redis快取了訊息、交換機和路由鍵。這是因為訊息傳送到MQ可能會失敗,一旦失敗訊息豈不是丟了?為避免這種情況,我們在業務程式碼中必須將message儲存起來,確保其進入了MQ再將儲存的訊息刪除。
那麼我們是怎麼知道訊息是否被正確傳送到RabbitMQ伺服器了呢?不用擔心,RabbitMQ有事務和confirm機制,可以確保訊息正確到傳送到RabbitMQ伺服器。但是由於事務會大大降低RabbitMQ的訊息處理能力,所以一般使用的是非同步的confirm機制。
生產者訊息確認機制
首先我們開啟rabbitMQ的生產者訊息確認機制。
spring:
rabbitmq:
publisher-confirms: true
publisher-returns: true
其中,設定spring.rabbitmq.publisher-confirms=true之後,RabbitMQ會監聽交換機,我們可以編寫自己的回撥方法,一旦訊息傳送到MQ的交換機,回撥方法就會被呼叫。
/**
* 當訊息傳送到交換機(exchange)時,回撥方法會被呼叫.
*/
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
redisTemplate.opsForHash().delete("message", correlationData.getId());
redisTemplate.opsForHash().delete("exchange", correlationData.getId());
redisTemplate.opsForHash().delete("routingKey", correlationData.getId());
} else {
log.error("訊息傳送失敗, cause:{}", cause);
Message message = (Message) redisTemplate.opsForHash().get("message", correlationData.getId());
String exchange = (String) redisTemplate.opsForHash().get("exchange", correlationData.getId());
String routingKey = (String) redisTemplate.opsForHash().get("routingKey", correlationData.getId());
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
//TODO 會不會由於某些原因,ack一直是false,導致訊息不斷地被重發。這樣需要加發送次數的限制
}
});
上面回撥邏輯也很簡單,訊息成功進入交換機,就刪除快取中的訊息;交換機拒絕訊息或者其他原因,會再次嘗試傳送訊息。
設定spring.rabbitmq.publisher-returns=true的作用是,開啟returnListener,當訊息從交換機路由佇列失敗時,會呼叫returnCallback的回撥方法。
另外,使用returnCallback的前提必須設定mandatory屬性為true。mandatory屬性為true表示,如果交換機無法根據自身型別和路由鍵找到一個與之匹配的佇列,那麼RabbitMQ會將訊息返還給生產者,當設定為false時,RabbitMQ將直接丟棄訊息。
/**
* 當訊息從交換機到佇列失敗時,回撥方法被呼叫。(若成功,則不呼叫)
* 需要注意的是:該方法呼叫後,MsgSendConfirmCallBack中的confirm方法也會被呼叫,且ack = true
*/
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.error("message send to queue failed.");
log.error("exchange: {}, routingKey: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
rabbitTemplate.convertAndSend(exchange, routingKey, message);
//重發次數應該加限制,以免路由錯誤或者佇列不存在
});
ack丟失怎麼辦?
訊息生產者可以通過返回的ack資訊知道訊息是否被髮送到RabbitMQ伺服器,但是有時候由於網路故障或者其他原因,RabbitMQ伺服器返回給傳送者的ack丟失了,傳送端不知道訊息是傳送成功了還是失敗了,這該怎麼辦呢?
答案就是重新發送,不管之前是否傳送成功,為了保證訊息必須被髮送到MQ伺服器,對於沒有迴應的訊息必須重新發送。解決方案就是在傳送訊息的時候對訊息的傳送時間也進行快取,定期掃描快取,如果超過一定時間訊息還在快取中,就重新發送此條訊息。
3、消費者監聽MQ佇列
@RabbitListener(queues = "[email protected]")
public void handleMessage(Message message, Channel channel) {
log.info("handleMessage begin... the message is : {}", message);
try {
User user = JSON.parseObject(new String(message.getBody()), User.class);
smsService.sendSms(user);
} catch (Exception e) {
log.error("handleMessage failed. error:", message, e);
Long retryCount = getRetryCount(message.getMessageProperties());
if (retryCount > 3) {
log.info("將訊息置入失敗佇列,等待人工處理.");
} else {
log.info("將訊息置入延時重試佇列,重試次數:" + retryCount);
rabbitTemplate.convertAndSend("sms_captcha_retry", "sms.captcha.retry", message);
}
}
//手動acknowledge
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
log.error("手動確認訊息失敗!");
e.printStackTrace();
}
}
RabbitMQ中,正常的訊息在轉換成Dead Letter時會在其header中新增一個名為“x-death”的陣列,其中包含了變成Dead Letter的次數count。我們可以根據這個特點,控制消費重試次數。
private Long getRetryCount(MessageProperties properties) {
Long retryCount = 0L;
Map<String, Object> headers = properties.getHeaders();
if (headers != null) {
if (headers.containsKey("x-death")) {
log.info("包含x-death頭部資訊.");
List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death");
if (deaths.size() > 0) {
Map<String, Object> death = deaths.get(0);
retryCount = (Long) death.get("count");
System.out.println("當前重試次數:" + retryCount);
}
}
}
return retryCount;
}
可以看到,消費者監聽MQ佇列,獲取訊息進行處理。如果訊息處理失敗,會將失敗的訊息傳送到延遲重試佇列中,再次消費。當重試次數達到三次,就不再重試,轉而將訊息傳送到失敗佇列,等待人工處理。
確保訊息冪等性
在訊息生產的時候提到過,由於ack可能會丟失,會導致訊息生產者存在訊息超發情況,所以消費者必須確保訊息的冪等性,以避免訊息的重複消費。
總結
參考部落格:
初寫部落格,語言組織不是很好,大家見諒。