HM-SpringCloud微服務系列12.1【訊息可靠性】
HM-SpringCloud微服務系列12:服務非同步通訊RabbitMQ-高階篇
回顧一下MQ的訊息傳送流程:訊息msg從生產者publisher傳送到交換機exchange,再到佇列queue,最後到消費者consumer
其中的每一步都可能導致訊息丟失,常見的丟失原因包括:
- 傳送時丟失:
- 生產者傳送的訊息未送達exchange
- 訊息到達exchange後未到達queue
- MQ宕機,queue將訊息丟失
- consumer接收到訊息後未消費就宕機
針對這些問題,RabbitMQ分別給出瞭解決方案:
- 生產者確認機制
- mq持久化
- 消費者確認機制
- 失敗重試機制
下面通過案例來演示每一個步驟。首先匯入課前資料提供的demo工程:
虛擬機器MQ環境準備:
1. 生產者訊息確認
1.1 publisher confirm機制
RabbitMQ提供了publisher confirm機制來避免訊息傳送到MQ過程中丟失。這種機制必須給每個訊息指定一個唯一ID。訊息傳送到MQ以後,會返回一個結果給傳送者,表示訊息是否處理成功。返回結果有兩種方式:
- publisher-confirm,傳送者確認
- 訊息成功投遞到交換機,返回ack
- 訊息未投遞到交換機,返回nack
- publisher-return,傳送者回執
- 訊息投遞到交換機了,但是沒有路由到佇列。返回ACK,及路由失敗原因。
1.2 SpringAMQP實現生產者確認
1.2.1 修改配置
修改publisher服務中的application.yml檔案,新增下面的內容:
spring:
rabbitmq:
publisher-confirm-type: correlated # 非同步回撥模式
publisher-returns: true
template:
mandatory: true
說明:
-
publish-confirm-type
:開啟publisher-confirm,這裡支援兩種型別:-
simple
:同步等待confirm結果,直到超時 -
correlated
:非同步回撥,定義ConfirmCallback,MQ返回結果時會回撥這個ConfirmCallback
-
-
publish-returns
:開啟publish-return功能,同樣是基於callback機制,不過是定義ReturnCallback -
template.mandatory
:定義訊息路由失敗時的策略。true,則呼叫ReturnCallback;false:則直接丟棄訊息
1.2.2 定義Return回撥
每個RabbitTemplate只能配置一個ReturnCallback,因此需要在專案載入時配置:
修改publisher服務,新增一個配置類:
package cn.itcast.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware { // 實現Bean工廠通知
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取RabbitTemplate物件
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置ReturnCallback(lambda表示式本質是匿名內部類。匿名內部類所實現介面中只有一個方法時可以用lambda替換)
/*rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
}
}); */
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 訊息投遞失敗,記錄日誌
log.error("訊息傳送到佇列失敗,響應碼:{},失敗原因:{},交換機:{},路由key:{},訊息:{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有業務需要,可以在此之下重發訊息
});
}
}
交換機與佇列手動繫結:
之前課程學MQ時是這樣傳送訊息的:
1.2.3 定義ConfirmCallback
ConfirmCallback可以在傳送訊息時指定,因為每個業務處理confirm成功或失敗的邏輯不一定相同。
在publisher服務的cn.itcast.mq.spring.SpringAmqpTest類中,定義一個單元測試方法testSendMessage2SimpleQueue2():
package cn.itcast.mq.spring;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.SuccessCallback;
import java.util.UUID;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
String routingKey = "simple.test";
// 1. 準備訊息
String message = "hello, spring amqp!";
// 2. 傳送訊息
rabbitTemplate.convertAndSend("amq.topic", routingKey, message);
}
@Test
public void testSendMessage2SimpleQueue2() throws InterruptedException {
// 1. 準備訊息
String message = "hello, spring amqp!";
// 2. 準備CorrelationData
// 2.1 訊息ID(全域性唯一的訊息ID,需要封裝到CorrelationData中)
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 2.2 準備ConfirmCallback
/*correlationData.getFuture().addCallback(new SuccessCallback<CorrelationData.Confirm>() {
@Override
public void onSuccess(CorrelationData.Confirm result) {}
}, new FailureCallback() {
@Override
public void onFailure(Throwable ex) {}
});*/
correlationData.getFuture().addCallback(result -> {
// 判斷結果
if (result.isAck()) {
//ACK:訊息成功
log.debug("訊息成功投遞到交換機!訊息ID:{}", correlationData.getId());
}else {
//NACK:訊息失敗
log.error("訊息投遞到交換機失敗!訊息ID:{}", correlationData.getId());
// 如果有業務需要,可以在此之下重發訊息
}
}, ex -> {
// 訊息投遞失敗,記錄日誌
log.error("訊息傳送異常!", ex);
// 如果有業務需要,可以在此之下重發訊息
});
// 3. 傳送訊息
rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);
// rabbitTemplate.convertAndSend("aaaamq.topic", "simple.test", message, correlationData); //故意填錯交換機模擬投遞失敗
// rabbitTemplate.convertAndSend("amq.topic", "aaa.simple.test", message, correlationData); //故意填錯路由模擬投遞異常
}
}
1.2.4 測試
成功情況:
失敗情況1:
失敗情況2:
1.3 小結
SpringAMQP中處理訊息確認的幾種情況:
-
publisher-comfirm:
- 訊息成功傳送到exchange,返回ack
- 訊息傳送失敗,沒有到達交換機,返回nack
- 訊息傳送過程中出現異常,沒有收到回執
-
訊息成功傳送到exchange,但沒有路由到queue,呼叫ReturnCallback
2. 訊息持久化
2.1 持久化機制
生產者確認可以確保訊息投遞到RabbitMQ的佇列中,但是訊息傳送到RabbitMQ以後,如果突然宕機,也可能導致訊息丟失。
要想確保訊息在RabbitMQ中安全儲存,必須開啟訊息持久化機制。
- 交換機持久化
- 佇列持久化
- 訊息持久化
非持久化情景演示:
接1.2.4最後結果檢視
重啟虛擬機器docker中的mq服務
再來檢視管理介面,發現系統的amq.topic
交換機還在;但剛剛自己建立的sample.queue
佇列沒了,那訊息也隨之沒了
2.2 實現持久化
2.2.1 交換機持久化
RabbitMQ中交換機預設是非持久化的,mq重啟後就丟失。
SpringAMQP中可以通過程式碼指定交換機持久化:
@Bean
public DirectExchange simpleDirect() {
// 三個引數:交換機名稱、是否持久化、當沒有queue與其繫結時是否自動刪除
return new DirectExchange("simple.direct", true, false);
}
2.2.2 佇列持久化
RabbitMQ中佇列預設是非持久化的,mq重啟後就丟失。
SpringAMQP中可以通過程式碼指定交換機持久化:
@Bean
public Queue simpleQueue() {
// 使用QueueBuilder構建佇列,durable()持久化;nonDurable()非持久化
return QueueBuilder.durable("simple.queue").build();
}
啟動consumer服務測試
可以在RabbitMQ控制檯看到持久化的交換機、佇列都會帶上D
的標示:
停止consumer服務,手動傳送訊息測試
既然佇列也已經持久化了,現在重啟一下mq,看看是否訊息還在
重啟完成,發現交換機、佇列還在,不過佇列裡的訊息沒了
說明訊息未實現持久化
2.2.3 訊息持久化
利用SpringAMQP傳送訊息時,可以設定訊息的屬性(MessageProperties),指定delivery-mode:
- 1:非持久化
- 2:持久化
用java程式碼指定:
@Test
public void testDurableMsg() {
// 1. 準備訊息
Message message = MessageBuilder.withBody("hello spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //NON_PERSISTENT是非持久
.build();
// 2. 傳送訊息
rabbitTemplate.convertAndSend("simple.queue", message);
log.debug("訊息傳送成功");
}
mode=2即為持久化
重啟mq
重新整理mq管理介面,可以看到訊息還在,證明訊息持久化了
2.2.4 實際上上面三者預設持久化(原始碼分析)
事實上,預設情況下,由SpringAMQP宣告的交換機、佇列都是持久化的。
預設情況下,SpringAMQP發出的任何訊息都是持久化的,不用特意指定。
3. 消費者訊息確認
RabbitMQ是閱後即焚機制,RabbitMQ確認訊息被消費者消費後會立刻刪除。
而RabbitMQ是通過消費者回執來確認消費者是否成功處理訊息的:消費者獲取訊息後,應該向RabbitMQ傳送ACK回執,表明自己已經處理訊息。
設想這樣的場景:
- 1)RabbitMQ投遞訊息給消費者
- 2)消費者獲取訊息後,返回ACK給RabbitMQ
- 3)RabbitMQ刪除訊息
- 4)消費者宕機,訊息尚未處理
這樣,訊息就丟失了。因此消費者返回ACK的時機非常重要。
而SpringAMQP則允許配置三種確認模式:
-
manual:手動ack,需要在業務程式碼結束後,呼叫api傳送ack。
-
auto:自動ack,由spring(AOP-環繞增強)監測listener程式碼是否出現異常,沒有異常則返回ack;丟擲異常則返回nack
-
none:關閉ack,MQ假定消費者獲取訊息後會成功處理,因此訊息投遞後立即被刪除
由此可知:
- none模式下,訊息投遞是不可靠的,可能丟失
- auto模式類似事務機制,出現異常時返回nack,訊息回滾到mq;沒有異常,返回ack
- manual:自己根據業務情況,判斷什麼時候該ack
一般,我們都是使用預設的auto即可。
3.1 演示none模式
修改consumer服務的application.yml檔案
修改consumer服務的SpringRabbitListener類中的方法,模擬一個訊息處理異常:
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消費者接收到simple.queue的訊息:【" + msg + "】");
System.out.println(1/0); //模擬異常:手動錯誤
log.info("消費者處理訊息成功!");
}
測試可以發現,當訊息處理拋異常時,訊息依然被RabbitMQ刪除了。
當前,simple.queue中有兩條訊息
debug啟動Consumer服務
啟動成功立馬進入了斷點,然後現在檢視mq管理頁面發現simple.queue中訊息沒了
但可以看到,消費者這裡還有訊息;接著除錯
可惜的是,拋異常了-->消費者沒了,然後訊息也丟失了
3.2 演示auto模式
修改consumer服務的application.yml檔案,把確認機制修改為auto:
先在simple.queue中手動準備一條訊息
在異常位置打斷點,再次傳送訊息,程式卡在斷點時,可以發現此時訊息狀態為unack(未確定狀態):
啟動成功立刻進入斷點,此時可以看到mq管理介面中顯示訊息狀態由ready變為unacked
丟擲異常後,因為Spring會自動返回nack,所以訊息恢復至Ready狀態,並且沒有被RabbitMQ刪除:
報錯後,訊息被重新投遞了,並沒有消失
4. 消費失敗重試機制
當消費者出現異常後,訊息會不斷requeue(重入隊)到佇列,再重新發送給消費者,然後再次異常,再次requeue,無限迴圈,導致mq的訊息處理飆升,帶來不必要的壓力:
怎麼辦呢?
4.1 本地重試
我們可以利用Spring的retry機制,在消費者出現異常時利用本地重試,而不是無限制的requeue到mq佇列。
修改consumer服務的application.yml檔案,新增內容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開啟消費者失敗重試
initial-interval: 1000 # 初始的失敗等待時長為1秒
multiplier: 1 # 失敗的等待時長倍數,下次等待時長 = multiplier * last-interval
max-attempts: 3 # 最大重試次數
stateless: true # true無狀態(預設);false有狀態。如果業務中包含事務,這裡改為false,不然會導致事務失效
重啟consumer服務,重複之前的測試。可以發現:
- 在重試3次後,SpringAMQP會丟擲異常AmqpRejectAndDontRequeueException,說明本地重試觸發了
- 檢視RabbitMQ控制檯,發現訊息被刪除了,說明最後SpringAMQP返回的是ack,mq刪除訊息了
結論:
- 開啟本地重試時,訊息處理過程中丟擲異常,不會requeue到佇列,而是在消費者本地重試
- 重試達到最大次數後,Spring會返回ack,訊息會被丟棄
4.2 失敗策略RepublishMessageRecoverer
在之前的測試中,達到最大重試次數後,訊息會被丟棄,這是由Spring內部機制決定的。
在開啟重試模式後,重試次數耗盡,如果訊息依然失敗,則需要有MessageRecovery介面來處理,它包含三種不同的實現:
-
RejectAndDontRequeueRecoverer:重試耗盡後,直接reject,丟棄訊息。預設就是這種方式
-
ImmediateRequeueMessageRecoverer:重試耗盡後,返回nack,訊息重新入隊
-
RepublishMessageRecoverer:重試耗盡後,將失敗訊息投遞到指定的交換機
這種情況下第一種方案RejectAndDontRequeueRecoverer會直接將訊息丟棄;而第二種方案ImmediateRequeueMessageRecoverer則會將訊息重新塞入simple.queue。
比較優雅的一種處理方案是RepublishMessageRecoverer,失敗後將訊息投遞到一個指定的,專門存放異常訊息的佇列,後續由人工集中處理。
4.2.1 在consumer服務中定義處理失敗訊息的交換機和佇列
4.2.2 定義一個RepublishMessageRecoverer關聯佇列和交換機
4.2.3 測試
完整程式碼
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ErrorMsgConfig {
/**
* 異常訊息交換機
* @return
*/
@Bean
public DirectExchange errorMsgExchange() {
return new DirectExchange("error.direct");
}
/**
* 異常訊息佇列
* @return
*/
@Bean
public Queue errorMsgQueue() {
return new Queue("error.queue");
}
/**
* 繫結佇列&交換機
* @return
*/
@Bean
public Binding errorMsgBinding() {
// 利用BindingBuilder,先指定佇列,再指定交換機,最後寫路由key
return BindingBuilder
.bind(errorMsgQueue())
.to(errorMsgExchange())
.with("error");
}
/**
* 關聯佇列&交換機
* @return
*/
@Bean
public MessageRecoverer republishMsgRecover(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
// 注意此處的交換機名字和路由key一定要與上面定義的一致
}
}
可以看到,不僅訊息被傳遞過來了,而且異常資訊也被完整呈現
推薦這種方案
5. 總結
[面試題] 如何確保RabbitMQ訊息的可靠性?
- 開啟生產者確認機制,確保生產者的訊息能到達佇列
- 開啟持久化功能,確保訊息未消費前在佇列中不會丟失
- 開啟消費者確認機制為auto,由spring確認訊息處理成功後完成ack
- 開啟消費者失敗重試機制,並設定MessageRecoverer,多次重試失敗後將訊息投遞到異常交換機,交由人工處理