1. 程式人生 > 實用技巧 >RocketMQ 死信佇列 | 消費者出現異常如何處理?

RocketMQ 死信佇列 | 消費者出現異常如何處理?

RocketMQ 重複消費問題 | 訂單系統核心流程引入冪等性機制一文中,我們討論了訊息重複消費的問題,比較好的方案是採用在消費側使用業務判斷法來保證介面的冪等性,這樣就能避免訊息重複消費的問題。

今天要討論的是消費者程式碼執行過程中出現異常,我們應該如何處理?

手動提交 offset

首先來看一段程式碼,Consumer 類是一個消費者類,它我們為它註冊了一個監聽器,在處理完訊息之後,會將訊息的狀態返回給 RocketMQ,執行成功返回的是訊息狀態是 CONSUME_SUCCESS

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");

        // 設定 NameServer 地址
        consumer.setNamesrvAddr("");
        // 訂閱 Topic
        consumer.subscribe("TopicTest", "*");
        // 這次回撥介面,接收訊息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
								// 對訊息的處理,比如發放優惠券、積分等
								return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

畫一張圖來表示向 RocketMQ 提交訊息狀態的流程,如圖所示:

訊息者業務程式碼出現異常怎麼辦?

再來看一下消費者的程式碼中監聽器的部分,它說如果訊息處理成功,那麼就返回訊息狀態為 CONSUME_SUCCESS,也有可能發放優惠券、積分等操作出現了異常,比如說資料庫掛掉了。這個時候應該怎麼處理呢?

consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
								// 對訊息的處理,比如發放優惠券、積分等
								return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

我們可以把程式碼改一改,捕獲異常之後返回訊息的狀態為 RECONSUME_LATER 表示稍後重試。

// 這次回撥介面,接收訊息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                try {
                    // 對訊息的處理,比如發放優惠券、積分等
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    // 萬一發生資料庫宕機等異常,返回稍後重試訊息的狀態
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }

            }
        });

重試佇列

這個時候,訊息會進入到 RocketMQ 的重試佇列中。

  • 比如說消費者所屬的訊息組名稱為AAAConsumerGroup
  • 其重試佇列名稱就叫做%RETRY%AAAConsumerGroup
  • 重試佇列中的訊息過一段時間會再次傳送給消費者,如果還是無法正常執行會再次進入重試佇列
  • 預設重試16次,還是無法執行,訊息就會從重試佇列進入到死信佇列

死信佇列

  • 重試佇列中的訊息重試16次任然無法執行,將會進入到死信佇列
  • 死信佇列的名字是 %DLQ%AAAConsumerGroup
  • 死信佇列中的訊息可以後臺開一個執行緒,訂閱%DLQ%AAAConsumerGroup,並不停重試

總結

本文從消費者的業務程式碼出現異常講起,介紹了 RocketMQ 的重試佇列和死信佇列:

  1. 程式碼正常執行返回訊息狀態為CONSUME_SUCCESS,執行異常返回RECONSUME_LATER
  2. 狀態為RECONSUME_LATER的訊息會進入到重試佇列,重試佇列的名稱為 %RETRY% + ConsumerGroupName
  3. 重試16次訊息任然沒有處理成功,訊息就會進入到死信佇列%DLQ% + ConsumerGroupName;