1. 程式人生 > 其它 >RabbitMQ死信佇列

RabbitMQ死信佇列

RabbitMQ是流行的開源訊息佇列系統,使用erlang語言開發,由於其社群活躍度高,維護更新較快,效能穩定,深得很多企業的歡心(當然,也包括我現在所在公司【手動滑稽】)。

為了保證訂單業務的訊息資料不丟失,需要使用到RabbitMQ的死信佇列機制,當訊息消費發生異常時,將訊息投入死信佇列中。但由於對死信佇列的概念及配置不熟悉,導致曾一度陷入百度的汪洋大海,無法自拔,很多文章都看起來可行,但是實際上卻並不能幫我解決實際問題。最終,在官網文件中找到了我想要的答案,通過官網文件的學習,才發現對於死信佇列存在一些誤解,導致配置死信佇列之路困難重重。

於是本著記錄和分享的精神,將死信佇列的概念和配置完整的寫下來,以便幫助遇到同樣問題的朋友。

本文閱讀前,需要對RabbitMQ有一個簡單的瞭解,偏向實戰配置講解。

一、死信佇列是什麼

死信,在官網中對應的單詞為Dead Letter”,可以看出翻譯確實非常的簡單粗暴。那麼死信是個什麼東西呢?

“死信”是RabbitMQ中的一種訊息機制,當你在消費訊息時,如果佇列裡的訊息出現以下情況:

  • 訊息被否定確認,使用 channel.basicNack channel.basicReject ,並且此時requeue 屬性被設定為false
  • 訊息在佇列的存活時間超過設定的生存時間(TTL)時間。
  • 訊息佇列的訊息數量已經超過最大佇列長度。

那麼該訊息將成為“死信”。

“死信”訊息會被RabbitMQ

進行特殊處理,如果配置了死信佇列資訊,那麼該訊息將會被丟進死信佇列中,如果沒有配置,則該訊息將會被丟棄。

二、如何配置死信佇列

這一部分將是本文的關鍵,如何配置死信佇列呢?其實很簡單,大概可以分為以下步驟:

  1. 配置業務佇列,繫結到業務交換機上
  2. 為業務佇列配置死信交換機和路由key
  3. 為死信交換機配置死信佇列

注意,並不是直接宣告一個公共的死信佇列,然後所以死信訊息就自己跑到死信佇列裡去了。而是為每個需要使用死信的業務佇列配置一個死信交換機,這裡同一個專案的死信交換機可以共用一個,然後為每個業務佇列分配一個單獨的路由key

有了死信交換機和路由key後,接下來,就像配置業務佇列一樣,配置死信佇列,然後繫結在死信交換機上。也就是說,死信佇列並不是什麼特殊的佇列,只不過是繫結在死信交換機上的佇列。死信交換機也不是什麼特殊的交換機,只不過是用來接受死信的

交換機,所以可以為任何型別【DirectFanoutTopic】。一般來說,會為每個業務佇列分配一個獨有的路由key,並對應的配置一個死信佇列進行監聽,也就是說,一般會為每個重要的業務佇列配置一個死信佇列

有了前文這些陳述後,接下來就是驚險刺激的實戰環節,這裡省略了RabbitMQ環境的部署和搭建環節。

1、先建立一個Springboot專案。然後在pom檔案中新增 spring-boot-starter-amqp spring-boot-starter-web 的依賴,接下來建立一個Config類,這裡是關鍵:

@Configuration
public class RabbitMQConfig {
 
    public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
    public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
    public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
    public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";
 
    // 宣告業務Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }
 
    // 宣告死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }
 
    // 宣告業務佇列A
    @Bean("businessQueueA")
    public Queue businessQueueA(){
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    這裡聲明當前佇列繫結的死信交換機
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  這裡聲明當前佇列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
    }
 
    // 宣告業務佇列B
    @Bean("businessQueueB")
    public Queue businessQueueB(){
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    這裡聲明當前佇列繫結的死信交換機
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  這裡聲明當前佇列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
    }
 
    // 宣告死信佇列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }
 
    // 宣告死信佇列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }
 
    // 宣告業務佇列A繫結關係
    @Bean
    public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }
 
    // 宣告業務佇列B繫結關係
    @Bean
    public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }
 
    // 宣告死信佇列A繫結關係
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                    @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }
 
    // 宣告死信佇列B繫結關係
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }
}

這裡聲明瞭兩個Exchange,一個是業務Exchange,另一個是死信Exchange,業務Exchange下綁定了兩個業務佇列,業務佇列都配置了同一個死信Exchange,並分別配置了路由key,在死信Exchange下綁定了兩個死信佇列,設定的路由key分別為業務佇列裡配置的路由key

2、下面是配置檔案application.yml

spring:
  rabbitmq:
    host: localhost
    password: guest
    username: guest
    listener:
      type: simple
      simple:
          default-requeue-rejected: false
          acknowledge-mode: manual

這裡記得將default-requeue-rejected屬性設定為false

3、接下來,是業務佇列的消費程式碼

@Slf4j
@Component
public class BusinessMessageReceiver {
    @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到業務訊息A:{}", msg);
        boolean ack = true;
        Exception exception = null;
        try {
            if (msg.contains("deadletter")){
                throw new RuntimeException("dead letter exception");
            }
        } catch (Exception e){
            ack = false;
            exception = e;
        }
        if (!ack){
            log.error("訊息消費發生異常,error msg:{}", exception.getMessage(), exception);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
 
    @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到業務訊息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

4、然後配置死信佇列的消費者:

@Component
public class DeadLetterMessageReceiver {
    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        System.out.println("收到死信訊息A:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
 
    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到死信訊息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

5、最後,為了方便測試,寫一個簡單的訊息生產者,並通過controller層來生產訊息。

@Component
public class BusinessMessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    public void sendMsg(String msg){
        rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
    }
}
 
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {
    @Autowired
    private BusinessMessageSender sender;
 
    @RequestMapping("sendmsg")
    public void sendMsg(String msg){
        sender.sendMsg(msg);
    }
}

一切準備就緒,啟動!可以從RabbitMQ的管理後臺中看到一共有四個佇列,除預設的Exchange外還有宣告的兩個Exchange

6、接下來,訪問一下url,來測試一下:

http://localhost:8080/rabbitmq/sendmsg?msg=msg

日誌:

收到業務訊息A:msg
收到業務訊息B:msg
表示兩個Consumer都正常收到了訊息。這代表正常消費的訊息,ack後正常返回。然後我們再來測試nck的訊息。
http://localhost:8080/rabbitmq/sendmsg?msg=deadletter

這將會觸發業務佇列ANCK,按照預期,訊息被NCK後,會拋到死信佇列中,因此死信佇列將會出現這個訊息,日誌如下:

收到業務訊息A:deadletter
訊息消費發生異常,error msg:dead letter exception
java.lang.RuntimeException: dead letter exception
...
收到死信訊息A:deadletter
可以看到,死信佇列的Consumer接受到了這個訊息,所以流程到此為止就打通了。

三、死信訊息的變化

那麼“死信”被丟到死信佇列中後,會發生什麼變化呢?

如果佇列配置了引數 x-dead-letter-routing-key 的話,“死信”的路由key將會被替換成該引數對應的值。如果沒有設定,則保留該訊息原有的路由key

舉個栗子:

如果原有訊息的路由keytestA,被髮送到業務Exchage中,然後被投遞到業務佇列QueueA中,如果該佇列沒有配置引數x-dead-letter-routing-key,則該訊息成為死信後,將保留原有的路由keytestA,如果配置了該引數,並且值設定為testB,那麼該訊息成為死信後,路由key將會被替換為testB,然後被拋到死信交換機中。

另外,由於被拋到了死信交換機,所以訊息的Exchange Name也會被替換為死信交換機的名稱。

訊息的Header中,也會新增很多奇奇怪怪的欄位,修改一下上面的程式碼,在死信佇列的消費者中新增一行日誌輸出:

log.info("死信訊息properties:{}", message.getMessageProperties());

然後重新執行一次,即可得到死信訊息Header中被新增的資訊:

Header中看起來有很多資訊,實際上並不多,只是值比較長而已。下面就簡單說明一下Header中的值:

欄位名 含義
x-first-death-exchange 第一次被拋入的死信交換機的名稱
x-first-death-reason 第一次成為死信的原因,rejected:訊息在重新進入佇列時被佇列拒絕,由於default-requeue-rejected 引數被設定為falseexpired :訊息過期。maxlen : 佇列內訊息數量超過佇列最大容量
x-first-death-queue 第一次成為死信前所在佇列名稱
x-death 歷次被投入死信交換機的資訊列表,同一個訊息每次進入一個死信交換機,這個陣列的資訊就會被更新

四、死信佇列應用場景

通過上面的資訊,我們已經知道如何使用死信隊列了,那麼死信佇列一般在什麼場景下使用呢?

一般用在較為重要的業務佇列中,確保未被正確消費的訊息不被丟棄,一般發生消費異常可能原因主要有由於訊息資訊本身存在錯誤導致處理異常,處理過程中引數校驗異常,或者因網路波動導致的查詢異常等等,當發生異常時,當然不能每次通過日誌來獲取原訊息,然後讓運維幫忙重新投遞訊息(沒錯,以前就是這麼幹的= =)。通過配置死信佇列,可以讓未正確處理的訊息暫存到另一個佇列中,待後續排查清楚問題後,編寫相應的處理程式碼來處理死信訊息,這樣比手工恢復資料要好太多了。

五、總結

死信佇列其實並沒有什麼神祕的地方,不過是繫結在死信交換機上的普通佇列,而死信交換機也只是一個普通的交換機,不過是用來專門處理死信的交換機。

總結一下死信訊息的生命週期:

  1. 業務訊息被投入業務佇列
  2. 消費者消費業務佇列的訊息,由於處理過程中發生異常,於是進行了nck或者reject操作
  3. nckreject的訊息由RabbitMQ投遞到死信交換機中
  4. 死信交換機將訊息投入相應的死信佇列
  5. 死信佇列的消費者消費死信訊息

死信訊息是RabbitMQ為我們做的一層保證,其實我們也可以不使用死信佇列,而是在訊息消費異常時,將訊息主動投遞到另一個交換機中,當你明白了這些之後,這些ExchangeQueue想怎樣配合就能怎麼配合。比如從死信佇列拉取訊息,然後傳送郵件、

簡訊、釘釘通知來通知開發人員關注。或者將訊息重新投遞到一個佇列然後設定過期時間,來進行延時消費。