1. 程式人生 > 其它 >RabbitMq高階之如何保證訊息傳送可靠性

RabbitMq高階之如何保證訊息傳送可靠性

1.RabbitMq的傳送機制
學過RabbitMq的同學們大概都知道了RabbitMq傳送機制引入了Exchange(交換機的概念),訊息傳送方,首先把訊息傳送到交換機這是第一個步驟,然後交換機在把訊息路由到不同的佇列中(Queue)這是第二個步驟,在有不同的消費者去消費。

注意:大致,知道訊息傳送的整個過程後,大概知道了要想保證訊息傳送成功主要是從兩個方面出發

1.訊息成功傳送到交換機(Exchange)
2.交換機成功路由到佇列(Queue)

2.針對兩種情況RabbitMq解決方案
為了解決這個兩個方案RabbitMq給出了倆種方案,事務機制和傳送方確認機制,由於事務機制過於耗費效能,效率比較有點低.這裡著重講解發送方確認機制
傳送方確認機制,就是訊息傳送到MQ那端後,MQ會回一個確認收到的訊息給我們.

2.1傳送方確認機制
在講解這個,首先介紹兩個介面ConfirmCallback和ReturnsCallback.
ConfirmCallback介面的作用:重寫次介面的confirm方法,可以確認訊息是否傳送到Exchange.
ReturnsCallback介面的作用:重寫次介面的returnedMessage方法,可以確認訊息從EXchange路由到Queue失敗,此方法的回撥是個失敗的回撥,訊息從Exchange路由到Queue失敗才會回撥這個方法。

配置:

spring:
application:
name: mq-test
rabbitmq:
host: ####
port: #####
#開啟訊息返回 訊息到達佇列的回撥
publisher-returns: true
#開啟訊息確認機制 訊息到達交換器的確認回撥
publisher-confirm-type: correlated

publisher-confirm-type配置主要有三個值:

none:表示禁用釋出確認模式,預設即此。
correlated:表示成功釋出訊息到交換器後會觸發的回撥方法。
simple:類似 correlated
實現上面倆個介面具體配置如下:

@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

@Resource
RabbitTemplate rabbitTemplate;

@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(this); // 指定 ConfirmCallback
rabbitTemplate.setReturnsCallback(this); // 指定 ReturnCallback
}
//這個主要是把訊息路由到交換機 沒有推到交換機會回撥 在這裡可以設定訊息傳送成功後的過程
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("{}:訊息成功到達交換器",correlationData.getId()); // correlationData作用是給訊息增加一個唯一性ID
}else{
log.error("{}:訊息傳送失敗", correlationData.getId());
}
}
// 交換機路由訊息到佇列 沒有路由成功會報錯
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("{}:訊息未成功路由到佇列",returned.getMessage().getMessageProperties().getMessageId());
}

}

接下來測試沒有傳送到交換機的情況:這是一個不存在的交換機

@Test
public void testNotExchange(){
rabbitTemplate.convertAndSend("test_not_exchange","test_rouking","測試沒有傳送到交換機的回撥".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
}

測試有交換機但是沒有佇列的情況:

@Test
public void testNotExchange(){
rabbitTemplate.convertAndSend("test_exchange","test_rouking__not","測試沒有路由到佇列的回撥".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
}

至此,就是RabbitMQ保證訊息可靠性的解決方案。

3.RabbitMQ重試機制
大資料培訓學習RabbitMQ的重試機制這裡主要也是個倆個原因:一是沒找到MQ的情況,二是找到MQ了,但是訊息傳送失敗了。

1.SpringBoot自帶的重試機制
配置如下:

spring:
application:
name: mq-test
rabbitmq:
host: #######
port: 5672
publisher-returns: true
# none:表示禁用釋出確認模式,預設即此。
# correlated:表示成功釋出訊息到交換器後會觸發的回撥方法。
# simple:類似 correlated,並且支援 waitForConfirms() 和 waitForConfirmsOrDie() 方法的呼叫。
publisher-confirm-type: correlated
# 此處配置重試機制 跟mq沒關係,基於springboot的重試機制
template:
retry:
enabled: true #開啟重試機制
initial-interval: 1000ms #重試起始間隔時間
max-attempts: 10 #最大重試次數
max-interval: 10000ms #最大重試次數
multiplier: 2 #間隔時間乘數

2.業務重試機制
針對訊息沒有成功到交換機,針對這種情況,回撥做相關的業務邏輯即可.
具體思路:
可以使用資料庫在每次傳送訊息的時候寫入庫裡給此條訊息增加一個狀態,在交換機回撥處,做相應的處理邏輯.
1.收到失敗回撥對此訊息做資料庫的修改,是重試幾次後,還是傳送失敗那麼就不在重新發送此條訊息,改變此條訊息的狀態即可。
2.如果訊息收到此條訊息傳送成功就改變次條訊息的狀態。
3.當然了,傳送失敗的訊息,重試次數超過後,定時器去找失敗的訊息,做業務處理即可。
注意:這裡會有訊息重複傳送的情況,在消費方做好訊息的冪等性即可。