1. 程式人生 > 其它 >HM-SpringCloud微服務系列12.2【死信交換機】

HM-SpringCloud微服務系列12.2【死信交換機】

1. 初識死信交換機

1.1 什麼是死信交換機

當一個佇列中的訊息滿足下列情況之一時,可以成為死信(dead letter):

  • 消費者使用basic.reject或 basic.nack宣告消費失敗,並且訊息的requeue引數設定為false
  • 訊息是一個過期訊息,超時無人消費
  • 要投遞的佇列訊息滿了,無法投遞

如果這個包含死信的佇列配置了dead-letter-exchange屬性,指定了一個交換機,那麼佇列中的死信就會投遞到這個交換機中,而這個交換機稱為死信交換機(Dead Letter Exchange,檢查DLX)。

如圖,假如消費者採用預設的失敗重試策略(在內部不斷重試,當重試次數耗盡時,就reject。被拒絕的訊息一般會被丟棄,若不想丟棄訊息需要給佇列繫結死信交換機)

一個訊息被消費者拒絕了,變成了死信:

因為simple.queue通過dead-letter-exchange屬性綁定了死信交換機 dl.direct,因此死信會投遞給這個交換機:

如果這個死信交換機也綁定了一個佇列,則訊息最終會進入這個存放死信的佇列:

另外,佇列將死信投遞給死信交換機時,必須知道兩個資訊:

  • 死信交換機名稱
  • 死信交換機與死信佇列繫結的RoutingKey

這樣才能確保投遞的訊息能到達死信交換機,並且正確的路由到死信佇列。

在此,與上節<(https://www.cnblogs.com/yppah/p/16244809.html>中失敗策略RepublishMessageRecoverer的異常訊息處理進行一下對比,可以發現:前者是消費者主動投遞失敗訊息到交換機,後者(本節死信交換機)是佇列主動投遞失敗訊息到交換機。

1.2 利用死信交換機接收死信(拓展|未講)

在失敗重試策略中,預設的RejectAndDontRequeueRecoverer會在本地重試次數耗盡後,傳送reject給RabbitMQ,訊息變成死信,被丟棄。

我們可以給simple.queue新增一個死信交換機,給死信交換機繫結一個佇列。這樣訊息變成死信後也不會丟棄,而是最終投遞到死信交換機,路由到與死信交換機繫結的佇列。

在consumer服務中,定義一組死信交換機、死信佇列:

// 宣告普通的 simple.queue佇列,並且為其指定死信交換機:dl.direct
@Bean
public Queue simpleQueue2(){
    return QueueBuilder.durable("simple.queue") // 指定佇列名稱,並持久化
        .deadLetterExchange("dl.direct") // 指定死信交換機
        .build();
}
// 宣告死信交換機 dl.direct
@Bean
public DirectExchange dlExchange(){
    return new DirectExchange("dl.direct", true, false);
}
// 宣告儲存死信的佇列 dl.queue
@Bean
public Queue dlQueue(){
    return new Queue("dl.queue", true);
}
// 將死信佇列 與 死信交換機繫結
@Bean
public Binding dlBinding(){
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

1.3 小結

什麼樣的訊息會成為死信?

  • 訊息被消費者reject或者返回nack
  • 訊息超時未消費
  • 佇列滿了

死信交換機的使用場景是什麼?

  • 如果佇列綁定了死信交換機,死信會投遞到死信交換機
  • 可以利用死信交換機收集所有消費者處理失敗的訊息(死信),交由人工處理,進一步提高訊息佇列的可靠性

如何給佇列繫結死信交換機?

  • 給佇列設定dead-letter-exchange屬性,指定一個交換機

  • 給佇列設定dead-letter-routing-key屬性,設定死信交換機與死信佇列的RoutingKey

2. TTL

2.1 什麼是TTL

TTL,也就是Time-To-Live。

如果一個佇列中的訊息如果超時未消費,則會變為死信,超時分為兩種情況:

  • 訊息所在的佇列設定了超時時間
  • 訊息本身設定了超時時間

2.2 接收超時死信的死信交換機

在consumer服務的SpringRabbitListener中,定義一個新的消費者,並且宣告 死信交換機、死信佇列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.queue", durable = "true"),
    exchange = @Exchange(name = "dl.direct"),
    key = "dl"
))
public void listenDlQueue(String msg) {
    log.debug("消費者接收到dl.queue的延遲訊息:{}", msg);
}

2.3 宣告一個佇列,並且指定TTL

在consumer服務的config包下新建TTLMsgConfig配置類

package cn.itcast.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TTLMsgConfig {

    @Bean
    public DirectExchange ttlDirectExchange() {
        return new DirectExchange("ttl.direct");
    }

    @Bean
    public Queue ttlQueue() {
        return QueueBuilder
                .durable("ttl.queue") // 指定佇列名稱,並持久化
                .ttl(10000) // 設定佇列的超時時間: 1W ms=10s
                .deadLetterExchange("dl.direct") // 指定死信交換機
                .deadLetterRoutingKey("dl")
                .build();
    }

    @Bean
    public Binding ttlBinding(){
        return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
    }
}

啟動consumer服務

在publisher服務的SpringAmqpTest測試類中新增測試方法:

@Test
public void testTTlMsg() {
    // 1. 準備訊息
    Message message = MessageBuilder
        .withBody("hello ttl".getBytes(StandardCharsets.UTF_8))
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //NON_PERSISTENT是非持久
        .build();
    // 2. 傳送訊息
    rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
    log.debug("訊息傳送成功");
}

此時,傳送的訊息並沒有設定超時時間,僅在佇列ttl.queue中設定了10秒延遲

執行testTTlMsg()

2.4 傳送訊息時,設定TTL

這次,傳送與接收的延遲只有5秒。說明當佇列、訊息都設定了TTL時,任意一個到期就會成為死信。

2.5 小結

訊息超時的兩種方式是?

  • 給佇列設定ttl屬性,進入佇列後超過ttl時間的訊息變為死信
  • 給訊息設定ttl屬性,佇列接收到訊息超過ttl時間後變為死信

如何實現傳送一個訊息20秒後消費者才收到訊息?

  • 給訊息的目標佇列指定死信交換機
  • 將消費者監聽的佇列繫結到死信交換機
  • 傳送訊息時給訊息設定超時時間為20秒

3. 延遲佇列

利用TTL結合死信交換機,我們實現了訊息發出後,消費者延遲收到訊息的效果。這種訊息模式就稱為延遲佇列(Delay Queue)模式。

延遲佇列的使用場景包括:

  • 延遲傳送簡訊
  • 使用者下單,如果使用者在15 分鐘內未支付,則自動取消
  • 預約工作會議,20分鐘後自動通知所有參會人員

因為延遲佇列的需求非常多,所以RabbitMQ的官方也推出了一個外掛,原生支援延遲佇列效果。

這個外掛就是DelayExchange外掛。參考RabbitMQ的外掛列表頁面:https://www.rabbitmq.com/community-plugins.html

使用方式可以參考官網地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

3.1 安裝DelayExchange外掛

3.1.1 下載外掛

以去對應的GitHub頁面下載3.8.9版本的外掛,地址為https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9這個對應RabbitMQ的3.8.5以上版本。

課前資料也提供了下載好的外掛:

3.1.2 上傳外掛

因為我們是基於Docker安裝,所以需要先檢視RabbitMQ的外掛目錄對應的資料卷。

我們之前設定的RabbitMQ的資料卷名稱為mq-plugins,所以我們使用下面命令檢視資料卷:

docker volume inspect mq-plugins

發現當前使用的mq是剛開始學習mq是安裝的,當時並沒有指定資料卷https://www.cnblogs.com/yppah/p/15832492.html

現在重新單機部署一個mq

上傳到虛擬機器中後,使用命令載入映象即可:

docker load -i mq.tar

停止之前執行的mq

執行下面的命令來執行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=haifei \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq2 \
 --hostname mq2 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

再來檢視mq2的資料卷

將外掛上傳到這個目錄即可:

3.1.3 安裝外掛

需要進入MQ容器內部來執行安裝。我的容器名為mq2,所以執行下面命令:

docker exec -it mq2 bash

進入容器內部後,執行下面命令開啟外掛:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.2 DelayExchange原理

DelayExchange需要將一個交換機宣告為delayed型別。當我們傳送訊息到delayExchange時,流程如下:

  • 接收訊息
  • 判斷訊息是否具備x-delay屬性
  • 如果有x-delay屬性,說明是延遲訊息,持久化到硬碟,讀取x-delay值,作為延遲時間
  • 返回routing not found結果給訊息傳送者
  • x-delay時間到期後,重新投遞訊息到指定佇列

3.3 使用DelayExchange(介面操作方式|瞭解)


3.4 SpringAMQP使用延遲佇列外掛(程式碼方式|掌握)

外掛的使用也非常簡單:宣告一個交換機,交換機的型別可以是任意型別,只需要設定delayed屬性為true即可,然後宣告佇列與其繫結即可。

3.4.1 宣告DelayExchange交換機

在consumer服務的SpringRabbitListener類中新增消費者

基於註解方式(推薦|簡單方便):

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "delay.queue", durable = "true"),
    exchange = @Exchange(name = "delay.direct", delayed = "true"),
    key = "delay"
))
public void listenDelayQueue(String msg) {
    log.debug("消費者接收到delay.queue的延遲訊息:{}", msg);
}

也可以基於@Bean的方式:

3.4.2 傳送訊息

在publisher服務的SpringAmqpTest類中新添測試方法

傳送訊息時,一定要攜帶x-delay屬性,指定延遲的時間:

@Test
public void testSendDelayMsg() throws InterruptedException {
    Message message = MessageBuilder
        .withBody("hello ttl".getBytes(StandardCharsets.UTF_8))
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //NON_PERSISTENT是非持久
        .setHeader("x-delay", 5000) //訊息延遲5秒
        .build();
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
    log.info("傳送訊息成功");
}

3.4.2 測試

先重啟comsumer服務

然後執行單元測試方法

檢視消費者的訊息接收情況

上面測試方法中的報錯無關緊要,訊息沒問題,下面在publisher服務的CommonConfig中判斷一下進行處理該報錯

再次測試,發現不會報錯了

3.5 小結

延遲佇列外掛的使用步驟包括哪些?

  • 宣告一個交換機,新增delayed屬性為true

  • 傳送訊息時,新增x-delay頭,值為超時時間