Spring Boot系列——死信佇列
在說死信佇列之前,我們先介紹下為什麼需要用死信佇列。
如果想直接瞭解死信對接,直接跳入下文的"死信佇列"部分即可。
ack機制和requeue-rejected屬性
我們還是基於上篇《Spring Boot系列——7步整合RabbitMQ》的demo程式碼來說。
在專案springboot-demo我們看到application.yaml檔案部分配置內容如下
... listener: type: simple simple: acknowledge-mode: auto concurrency: 5 default-requeue-rejected: true max-concurrency: 100 ...
其中
acknowledge-mode
該配置項是用來表示訊息確認方式,其有三種配置方式,分別是none、manual和auto。
none意味著沒有任何的應答會被髮送。
manual意味著監聽者必須通過呼叫Channel.basicAck()來告知所有的訊息。
auto意味著容器會自動應答,除非MessageListener丟擲異常,這是預設配置方式。
default-requeue-rejected
該配置項是決定由於監聽器丟擲異常而拒絕的訊息是否被重新放回佇列。預設值為true。
我一開始對於這個屬性有個誤解,我以為rejected是表示拒絕,所以將requeue-rejected
所以如果該屬性配置為true表示會重新放回佇列,如果配置為false表示不會放回佇列。
下面我們看看acknowledge-mode引數和default-requeue-rejected引數使用不同的組合方式,RabbitMQ是如何處理訊息的。
程式碼依然使用springboot-demo中的RabbitApplicationTests傳送訊息,使用Receiver類監聽demo-queue佇列的訊息。
對於Receiver類添加了一行程式碼,該程式碼模擬丟擲異常
@Component
public class Receiver {
@RabbitListener(queues = "demo_queue")
public void created(String message) {
System.out.println("orignal message: " + message);
int i = 1/0;
}
}
acknowledge-mode=none, default-requeue-rejected=false
該配置不會確認訊息是否正常消費,所以在控制檯沒有丟擲任何異常。通過在RabbitMQ管理頁面也沒有看到重新放回佇列的訊息
acknowledge-mode=none, default-requeue-rejected=true
同樣該配置不會確認訊息是否正常消費,所以在控制檯沒有丟擲任何異常。而且即使default-requeue-rejected配置為true因為沒有確認所以也沒有看到重新放回佇列的訊息
acknowledge-mode=manual, default-requeue-rejected=false
該配置需要手動確認訊息是否正常消費,但是程式碼中並沒有手動確認,個人理解是因為沒有收到ack,所以訊息又回到了佇列中。
acknowledge-mode=manual, default-requeue-rejected=true
該配置需要手動確認訊息是否正常消費,但是程式碼中並沒有手動確認,所以訊息被重新放入到佇列中了,並且在控制檯發現還丟擲了異常(這塊不是很清楚,default-requeue-rejected設定true和false帶來的不同效果,有了解的麻煩下方留言指教)。
acknowledge-mode=auto, default-requeue-rejected=false
該配置採用自動確認,從結果來看,是自動確認了。
從控制檯列印的結果可以看出Receiver方法執行了3次,分別是前面兩條放回佇列的訊息以及這次傳送的訊息,所以3條訊息都消費了。
同時因為default-requeue-rejected設定為false,所以即使消費丟擲異常,也沒有將訊息放回佇列。
acknowledge-mode=auto, default-requeue-rejected=true
該配置同樣採用自動確認,從結果看出,沒有丟擲異常(這塊也不是很理解),且因為default-requeue-rejected設定為true,所以訊息重新回到佇列。
綜上羅列這麼多情況只為說明有些情況下,如果訊息消費出錯,因為配置問題導致訊息丟失了。這在很多情況下是要命的,比如使用者支付的訂單號,如果因為拋異常等原因直接丟失是很要命的。
所以,我們需要有一個確保機制,能夠保證即使失敗的訊息也能儲存下來,這時候死信佇列就排上用場了。
死信佇列
死信佇列的整個設計思路是這樣的
生產者 --> 訊息 --> 交換機 --> 佇列 --> 變成死信 --> DLX交換機 -->佇列 --> 消費者
下面我們通過網上的一個簡單的死信佇列的實現看看如何使用死信佇列。
@Bean("deadLetterExchange")
public Exchange deadLetterExchange() {
return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
}
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 宣告 死信交換機
args.put("x-dead-letter-exchange", "DL_EXCHANGE");
// x-dead-letter-routing-key 宣告 死信路由鍵
args.put("x-dead-letter-routing-key", "KEY_R");
return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
}
@Bean("redirectQueue")
public Queue redirectQueue() {
return QueueBuilder.durable("REDIRECT_QUEUE").build();
}
/**
* 死信路由通過 DL_KEY 繫結鍵繫結到死信佇列上.
*
* @return the binding
*/
@Bean
public Binding deadLetterBinding() {
return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);
}
/**
* 死信路由通過 KEY_R 繫結鍵繫結到死信佇列上.
*
* @return the binding
*/
@Bean
public Binding redirectBinding() {
return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
}
注意
-
聲明瞭一個direct模式的exchange。
-
聲明瞭一個死信佇列deadLetterQueue,該佇列配置了一些屬性
x-dead-letter-exchange
表明死信交換機,x-dead-letter-routing-key
表明死信路由鍵,因為是direct模式,所以需要設定這個路由鍵。 -
聲明瞭一個替補佇列redirectQueue,變成死信的訊息最終就是存放在這個佇列的。
-
宣告繫結關係,分別是死信佇列以及替補佇列和交換機的繫結。
那麼如何模擬生成一個死信訊息呢,可以在傳送到DL_QUEUE的訊息在10秒後失效,然後轉發到替補佇列中,程式碼實現如下
public void sendMsg(String content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 設定編碼
messageProperties.setContentEncoding("utf-8");
// 設定過期時間10*1000毫秒
messageProperties.setExpiration("5000");
return message;
};
rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", content, messagePostProcessor);
}
執行結果如下
訊息首先進入DL_QUEUE,5秒後失效,被轉發到REDIRECT_QUEUE中。
來源:https://www.cnblogs.com/bigdataZJ/p/springboot-deadletter-queue.html