RocketMQ 死信佇列 | 消費者出現異常如何處理?
阿新 • • 發佈:2020-06-27
在RocketMQ 重複消費問題 | 訂單系統核心流程引入冪等性機制一文中,我們討論了訊息重複消費的問題,比較好的方案是採用在消費側使用業務判斷法來保證介面的冪等性,這樣就能避免訊息重複消費的問題。
今天要討論的是消費者程式碼執行過程中出現異常,我們應該如何處理?
手動提交 offset
首先來看一段程式碼,Consumer
類是一個消費者類,它我們為它註冊了一個監聽器,在處理完訊息之後,會將訊息的狀態返回給 RocketMQ,執行成功返回的是訊息狀態是 CONSUME_SUCCESS
。
public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer"); // 設定 NameServer 地址 consumer.setNamesrvAddr(""); // 訂閱 Topic consumer.subscribe("TopicTest", "*"); // 這次回撥介面,接收訊息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { // 對訊息的處理,比如發放優惠券、積分等 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
畫一張圖來表示向 RocketMQ 提交訊息狀態的流程,如圖所示:
訊息者業務程式碼出現異常怎麼辦?
再來看一下消費者的程式碼中監聽器的部分,它說如果訊息處理成功,那麼就返回訊息狀態為 CONSUME_SUCCESS
,也有可能發放優惠券、積分等操作出現了異常,比如說資料庫掛掉了。這個時候應該怎麼處理呢?
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { // 對訊息的處理,比如發放優惠券、積分等 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
我們可以把程式碼改一改,捕獲異常之後返回訊息的狀態為 RECONSUME_LATER
表示稍後重試。
// 這次回撥介面,接收訊息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { // 對訊息的處理,比如發放優惠券、積分等 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { // 萬一發生資料庫宕機等異常,返回稍後重試訊息的狀態 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } });
重試佇列
這個時候,訊息會進入到 RocketMQ 的重試佇列中。
- 比如說消費者所屬的訊息組名稱為
AAAConsumerGroup
- 其重試佇列名稱就叫做
%RETRY%AAAConsumerGroup
- 重試佇列中的訊息過一段時間會再次傳送給消費者,如果還是無法正常執行會再次進入重試佇列
- 預設重試16次,還是無法執行,訊息就會從重試佇列進入到死信佇列
死信佇列
- 重試佇列中的訊息重試16次任然無法執行,將會進入到死信佇列
- 死信佇列的名字是
%DLQ%AAAConsumerGroup
- 死信佇列中的訊息可以後臺開一個執行緒,訂閱
%DLQ%AAAConsumerGroup
,並不停重試
總結
本文從消費者的業務程式碼出現異常講起,介紹了 RocketMQ 的重試佇列和死信佇列:
- 程式碼正常執行返回訊息狀態為
CONSUME_SUCCESS
,執行異常返回RECONSUME_LATER
- 狀態為
RECONSUME_LATER
的訊息會進入到重試佇列,重試佇列的名稱為%RETRY% + ConsumerGroupName
; - 重試16次訊息任然沒有處理成功,訊息就會進入到死信佇列
%DLQ% + ConsumerGroupName
;