Spring Cloud Stream消費失敗後的處理策略(三):使用DLQ隊列(RabbitMQ)
應用場景
前兩天我們已經介紹了兩種Spring Cloud Stream對消息失敗的處理策略:
- 自動重試:對於一些因環境原因(如:網絡抖動等不穩定因素)引發的問題可以起到比較好的作用,提高消息處理的成功率。
- 自定義錯誤處理邏輯:如果業務上,消息處理失敗之後有明確的降級邏輯可以彌補的,可以采用這種方式,但是2.0.x版本有Bug,2.1.x版本修復。
那麽如果代碼本身存在邏輯錯誤,無論重試多少次都不可能成功,也沒有具體的降級業務邏輯,之前在深入思考中討論過,可以通過日誌,或者降級邏輯記錄的方式把錯誤消息保存下來,然後事後分析、修復Bug再重新處理。但是很顯然,這樣做非常原始,並且太過笨拙,處理復雜度過高。所以,本文將介紹利用中間件特性來便捷地處理該問題的方案:使用RabbitMQ的DLQ隊列。
動手試試
準備一個會消費失敗的例子,可以直接沿用前文的工程。也可以新建一個,然後創建如下代碼的邏輯:
@EnableBinding(TestApplication.TestTopic.class) @SpringBootApplication public class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } @RestController static class TestController { @Autowired private TestTopic testTopic; /** * 消息生產接口 * * @param message * @return */ @GetMapping("/sendMessage") public String messageWithMQ(@RequestParam String message) { testTopic.output().send(MessageBuilder.withPayload(message).build()); return "ok"; } } /** * 消息消費邏輯 */ @Slf4j @Component static class TestListener { @StreamListener(TestTopic.INPUT) public void receive(String payload) { log.info("Received payload : " + payload); throw new RuntimeException("Message consumer failed!"); } } interface TestTopic { String OUTPUT = "example-topic-output"; String INPUT = "example-topic-input"; @Output(OUTPUT) MessageChannel output(); @Input(INPUT) SubscribableChannel input(); } }
內容很簡單,既包含了消息的生產,也包含了消息消費。消息消費的時候主動拋出了一個異常來模擬消息的消費失敗。
在啟動應用之前,還要記得配置一下輸入輸出通道對應的物理目標(exchange或topic名)、並設置一下分組,比如:
spring.cloud.stream.bindings.example-topic-input.destination=test-topic spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1 spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true spring.cloud.stream.bindings.example-topic-output.destination=test-topic
這裏加入了一個重要配置spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true
,用來開啟DLQ(死信隊列)。完成了上面配置之後,啟動應用並訪問localhost:8080/sendMessage?message=hello
接口來發送一個消息到MQ中了,此時可以看到消費失敗後拋出了異常,消息消費失敗,記錄了日誌。此時,可以查看RabbitMQ的控制臺如下:
其中,test-topic.stream-exception-handler.dlq
隊列就是test-topic.stream-exception-handler
的dlq(死信)隊列,當test-topic.stream-exception-handler
隊列中的消息消費時候之後,就會將這條消息原封不動的轉存到dlq隊列中。這樣這些沒有得到妥善處理的消息就通過簡單的配置實現了存儲,之後,我們還可以通過簡單的操作對這些消息進行重新消費。我們只需要在控制臺中點擊test-topic.stream-exception-handler.dlq
隊列的名字進入到詳情頁面之後,使用Move messages
功能,直接將這些消息移動回test-topic.stream-exception-handler
隊列,這樣這些消息就能重新被消費一次。
如果Move messages功能中是如下內容:
To move messages, the shovel plugin must be enabled, try:
$ rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
那是由於沒有安裝對應的插件,只需要根據提示的命令安裝就能使用該命令了。
深入思考
先來總結一下在引入了RabbitMQ的DLQ之後,對於消息異常處理更為完整一些的基本思路:
- 瞬時的環境抖動引起的異常,利用重試功能提高處理成功率
- 如果重試依然失敗的,日誌報錯,並進入DLQ隊列
- 日誌告警通知相關開發人員,分析問題原因
- 解決問題(修復程序Bug、擴容等措施)之後,DLQ隊列中的消息移回重新處理
在這樣的整體思路中,可能還涉及一些微調,這裏舉幾個常見例子,幫助讀者進一步了解一些特殊的場景和配置使用!
場景一:有些消息在業務上存在時效性,進入死信隊列之後,過一段時間再處理已經沒有意義,這個時候如何過濾這些消息呢?
只需要配置一個參數即可:
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.dlq-ttl=10000
該參數可以控制DLQ隊列中消息的存活時間,當超過配置時間之後,該消息會自動的從DLQ隊列中移除。
場景二:可能進入DLQ隊列的消息存在各種不同的原因(不同異常造成的),此時如果在做補救措施的時候,還希望根據這些異常做不同的處理時候,我們如何區分這些消息進入DLQ的原因呢?
再來看看這個參數:
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.republish-to-dlq=true
該參數默認是false,如果設置了死信隊列的時候,會將消息原封不動的發送到死信隊列(也就是上面例子中的實現),此時大家可以在RabbitMQ控制臺中通過Get message(s)
功能來看看隊列中的消息,應該如下圖所示:
這是一條原始消息。
如果我們該配置設置為true的時候,那麽該消息在進入到死信隊列的時候,會在headers中加入錯誤信息,如下圖所示:
這樣,不論我們是通過移回原通道處理還是新增訂閱處理這些消息的時候就可以以此作為依據進行分類型處理了。
關於RabbitMQ的binder中還有很多關於DLQ的配置,這裏不一一介紹了,上面幾個是目前筆者使用過的幾個,其他一些暫時認為采用默認配置已經夠用,除非還有其他定制要求,或者是存量內容,需要去適配才會去配置。讀者可以查看官方文檔了解更多詳情!
代碼示例
本文示例讀者可以通過查看下面倉庫的中的stream-exception-handler-3
項目:
- Github
- Gitee
如果您對這些感興趣,歡迎star、follow、收藏、轉發給予支持!
以下專題教程也許您會有興趣
- Spring Boot基礎教程
- Spring Cloud基礎教程
本文首發:http://blog.didispace.com/spring-cloud-starter-finchley-7-4/
Spring Cloud Stream消費失敗後的處理策略(三):使用DLQ隊列(RabbitMQ)