1. 程式人生 > 其它 >HM-SpringCloud微服務系列12.1【訊息可靠性】

HM-SpringCloud微服務系列12.1【訊息可靠性】

HM-SpringCloud微服務系列12:服務非同步通訊RabbitMQ-高階篇



回顧一下MQ的訊息傳送流程:訊息msg從生產者publisher傳送到交換機exchange,再到佇列queue,最後到消費者consumer

其中的每一步都可能導致訊息丟失,常見的丟失原因包括:

  • 傳送時丟失:
    • 生產者傳送的訊息未送達exchange
    • 訊息到達exchange後未到達queue
  • MQ宕機,queue將訊息丟失
  • consumer接收到訊息後未消費就宕機

針對這些問題,RabbitMQ分別給出瞭解決方案:

  • 生產者確認機制
  • mq持久化
  • 消費者確認機制
  • 失敗重試機制

下面通過案例來演示每一個步驟。首先匯入課前資料提供的demo工程:


虛擬機器MQ環境準備:

1. 生產者訊息確認

1.1 publisher confirm機制

RabbitMQ提供了publisher confirm機制來避免訊息傳送到MQ過程中丟失。這種機制必須給每個訊息指定一個唯一ID。訊息傳送到MQ以後,會返回一個結果給傳送者,表示訊息是否處理成功。返回結果有兩種方式:

  • publisher-confirm,傳送者確認
    • 訊息成功投遞到交換機,返回ack
    • 訊息未投遞到交換機,返回nack
  • publisher-return,傳送者回執
    • 訊息投遞到交換機了,但是沒有路由到佇列。返回ACK,及路由失敗原因。

1.2 SpringAMQP實現生產者確認

1.2.1 修改配置

修改publisher服務中的application.yml檔案,新增下面的內容:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 非同步回撥模式
    publisher-returns: true
    template:
      mandatory: true

說明:

  • publish-confirm-type:開啟publisher-confirm,這裡支援兩種型別:
    • simple:同步等待confirm結果,直到超時
    • correlated:非同步回撥,定義ConfirmCallback,MQ返回結果時會回撥這個ConfirmCallback
  • publish-returns:開啟publish-return功能,同樣是基於callback機制,不過是定義ReturnCallback
  • template.mandatory:定義訊息路由失敗時的策略。true,則呼叫ReturnCallback;false:則直接丟棄訊息

1.2.2 定義Return回撥

每個RabbitTemplate只能配置一個ReturnCallback,因此需要在專案載入時配置:

修改publisher服務,新增一個配置類:

package cn.itcast.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware { // 實現Bean工廠通知

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 獲取RabbitTemplate物件
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

        // 配置ReturnCallback(lambda表示式本質是匿名內部類。匿名內部類所實現介面中只有一個方法時可以用lambda替換)
        /*rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

            }
        }); */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 訊息投遞失敗,記錄日誌
            log.error("訊息傳送到佇列失敗,響應碼:{},失敗原因:{},交換機:{},路由key:{},訊息:{}",
                    replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有業務需要,可以在此之下重發訊息
        });
    }
}


交換機與佇列手動繫結:

之前課程學MQ時是這樣傳送訊息的:

1.2.3 定義ConfirmCallback

ConfirmCallback可以在傳送訊息時指定,因為每個業務處理confirm成功或失敗的邏輯不一定相同。

在publisher服務的cn.itcast.mq.spring.SpringAmqpTest類中,定義一個單元測試方法testSendMessage2SimpleQueue2():

package cn.itcast.mq.spring;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.SuccessCallback;

import java.util.UUID;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        String routingKey = "simple.test";
        // 1. 準備訊息
        String message = "hello, spring amqp!";
        // 2. 傳送訊息
        rabbitTemplate.convertAndSend("amq.topic", routingKey, message);
    }

    @Test
    public void testSendMessage2SimpleQueue2() throws InterruptedException {
        // 1. 準備訊息
        String message = "hello, spring amqp!";

        // 2. 準備CorrelationData
        // 2.1 訊息ID(全域性唯一的訊息ID,需要封裝到CorrelationData中)
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        // 2.2 準備ConfirmCallback
        /*correlationData.getFuture().addCallback(new SuccessCallback<CorrelationData.Confirm>() {
            @Override
            public void onSuccess(CorrelationData.Confirm result) {}
        }, new FailureCallback() {
            @Override
            public void onFailure(Throwable ex) {}
        });*/
        correlationData.getFuture().addCallback(result -> {
            // 判斷結果
            if (result.isAck()) {
                //ACK:訊息成功
                log.debug("訊息成功投遞到交換機!訊息ID:{}", correlationData.getId());
            }else {
                //NACK:訊息失敗
                log.error("訊息投遞到交換機失敗!訊息ID:{}", correlationData.getId());
                // 如果有業務需要,可以在此之下重發訊息
            }
        }, ex -> {
            // 訊息投遞失敗,記錄日誌
            log.error("訊息傳送異常!", ex);
            // 如果有業務需要,可以在此之下重發訊息
        });

        // 3. 傳送訊息
        rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);
//        rabbitTemplate.convertAndSend("aaaamq.topic", "simple.test", message, correlationData); //故意填錯交換機模擬投遞失敗
//        rabbitTemplate.convertAndSend("amq.topic", "aaa.simple.test", message, correlationData); //故意填錯路由模擬投遞異常
    }
}

1.2.4 測試

成功情況:

失敗情況1:

失敗情況2:

1.3 小結

SpringAMQP中處理訊息確認的幾種情況:

  • publisher-comfirm:

    • 訊息成功傳送到exchange,返回ack
    • 訊息傳送失敗,沒有到達交換機,返回nack
    • 訊息傳送過程中出現異常,沒有收到回執
  • 訊息成功傳送到exchange,但沒有路由到queue,呼叫ReturnCallback

2. 訊息持久化

2.1 持久化機制

生產者確認可以確保訊息投遞到RabbitMQ的佇列中,但是訊息傳送到RabbitMQ以後,如果突然宕機,也可能導致訊息丟失。

要想確保訊息在RabbitMQ中安全儲存,必須開啟訊息持久化機制。

  • 交換機持久化
  • 佇列持久化
  • 訊息持久化

非持久化情景演示:

接1.2.4最後結果檢視

重啟虛擬機器docker中的mq服務

再來檢視管理介面,發現系統的amq.topic交換機還在;但剛剛自己建立的sample.queue佇列沒了,那訊息也隨之沒了

2.2 實現持久化

2.2.1 交換機持久化

RabbitMQ中交換機預設是非持久化的,mq重啟後就丟失。

SpringAMQP中可以通過程式碼指定交換機持久化:

@Bean
public DirectExchange simpleDirect() {
    // 三個引數:交換機名稱、是否持久化、當沒有queue與其繫結時是否自動刪除
    return new DirectExchange("simple.direct", true, false);
}

2.2.2 佇列持久化

RabbitMQ中佇列預設是非持久化的,mq重啟後就丟失。

SpringAMQP中可以通過程式碼指定交換機持久化:

@Bean
public Queue simpleQueue() {
    // 使用QueueBuilder構建佇列,durable()持久化;nonDurable()非持久化
    return QueueBuilder.durable("simple.queue").build(); 
}

啟動consumer服務測試

可以在RabbitMQ控制檯看到持久化的交換機、佇列都會帶上D的標示:

停止consumer服務,手動傳送訊息測試

既然佇列也已經持久化了,現在重啟一下mq,看看是否訊息還在

重啟完成,發現交換機、佇列還在,不過佇列裡的訊息沒了

說明訊息未實現持久化

2.2.3 訊息持久化

利用SpringAMQP傳送訊息時,可以設定訊息的屬性(MessageProperties),指定delivery-mode:

  • 1:非持久化
  • 2:持久化

用java程式碼指定:

@Test
public void testDurableMsg() {
    // 1. 準備訊息
    Message message = MessageBuilder.withBody("hello spring".getBytes(StandardCharsets.UTF_8))
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //NON_PERSISTENT是非持久
        .build();
    // 2. 傳送訊息
    rabbitTemplate.convertAndSend("simple.queue", message);
    log.debug("訊息傳送成功");
}

mode=2即為持久化

重啟mq

重新整理mq管理介面,可以看到訊息還在,證明訊息持久化了

2.2.4 實際上上面三者預設持久化(原始碼分析)

事實上,預設情況下,由SpringAMQP宣告的交換機、佇列都是持久化的。

預設情況下,SpringAMQP發出的任何訊息都是持久化的,不用特意指定。

3. 消費者訊息確認

RabbitMQ是閱後即焚機制,RabbitMQ確認訊息被消費者消費後會立刻刪除。

而RabbitMQ是通過消費者回執來確認消費者是否成功處理訊息的:消費者獲取訊息後,應該向RabbitMQ傳送ACK回執,表明自己已經處理訊息。

設想這樣的場景:

  • 1)RabbitMQ投遞訊息給消費者
  • 2)消費者獲取訊息後,返回ACK給RabbitMQ
  • 3)RabbitMQ刪除訊息
  • 4)消費者宕機,訊息尚未處理

這樣,訊息就丟失了。因此消費者返回ACK的時機非常重要。

而SpringAMQP則允許配置三種確認模式:

  • manual:手動ack,需要在業務程式碼結束後,呼叫api傳送ack。

  • auto:自動ack,由spring(AOP-環繞增強)監測listener程式碼是否出現異常,沒有異常則返回ack;丟擲異常則返回nack

  • none:關閉ack,MQ假定消費者獲取訊息後會成功處理,因此訊息投遞後立即被刪除

由此可知:

  • none模式下,訊息投遞是不可靠的,可能丟失
  • auto模式類似事務機制,出現異常時返回nack,訊息回滾到mq;沒有異常,返回ack
  • manual:自己根據業務情況,判斷什麼時候該ack

一般,我們都是使用預設的auto即可。

3.1 演示none模式

修改consumer服務的application.yml檔案

修改consumer服務的SpringRabbitListener類中的方法,模擬一個訊息處理異常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
    System.out.println("消費者接收到simple.queue的訊息:【" + msg + "】");
    System.out.println(1/0); //模擬異常:手動錯誤
    log.info("消費者處理訊息成功!");
}

測試可以發現,當訊息處理拋異常時,訊息依然被RabbitMQ刪除了。


當前,simple.queue中有兩條訊息

debug啟動Consumer服務

啟動成功立馬進入了斷點,然後現在檢視mq管理頁面發現simple.queue中訊息沒了

但可以看到,消費者這裡還有訊息;接著除錯

可惜的是,拋異常了-->消費者沒了,然後訊息也丟失了

3.2 演示auto模式

修改consumer服務的application.yml檔案,把確認機制修改為auto:

先在simple.queue中手動準備一條訊息

在異常位置打斷點,再次傳送訊息,程式卡在斷點時,可以發現此時訊息狀態為unack(未確定狀態):

啟動成功立刻進入斷點,此時可以看到mq管理介面中顯示訊息狀態由ready變為unacked

丟擲異常後,因為Spring會自動返回nack,所以訊息恢復至Ready狀態,並且沒有被RabbitMQ刪除:

報錯後,訊息被重新投遞了,並沒有消失

4. 消費失敗重試機制

當消費者出現異常後,訊息會不斷requeue(重入隊)到佇列,再重新發送給消費者,然後再次異常,再次requeue,無限迴圈,導致mq的訊息處理飆升,帶來不必要的壓力:

怎麼辦呢?

4.1 本地重試

我們可以利用Spring的retry機制,在消費者出現異常時利用本地重試,而不是無限制的requeue到mq佇列。

修改consumer服務的application.yml檔案,新增內容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 開啟消費者失敗重試
          initial-interval: 1000 # 初始的失敗等待時長為1秒
          multiplier: 1 # 失敗的等待時長倍數,下次等待時長 = multiplier * last-interval
          max-attempts: 3 # 最大重試次數
          stateless: true # true無狀態(預設);false有狀態。如果業務中包含事務,這裡改為false,不然會導致事務失效

重啟consumer服務,重複之前的測試。可以發現:

  • 在重試3次後,SpringAMQP會丟擲異常AmqpRejectAndDontRequeueException,說明本地重試觸發了
  • 檢視RabbitMQ控制檯,發現訊息被刪除了,說明最後SpringAMQP返回的是ack,mq刪除訊息了

結論:

  • 開啟本地重試時,訊息處理過程中丟擲異常,不會requeue到佇列,而是在消費者本地重試
  • 重試達到最大次數後,Spring會返回ack,訊息會被丟棄

4.2 失敗策略RepublishMessageRecoverer

在之前的測試中,達到最大重試次數後,訊息會被丟棄,這是由Spring內部機制決定的。

在開啟重試模式後,重試次數耗盡,如果訊息依然失敗,則需要有MessageRecovery介面來處理,它包含三種不同的實現:

  • RejectAndDontRequeueRecoverer:重試耗盡後,直接reject,丟棄訊息。預設就是這種方式

  • ImmediateRequeueMessageRecoverer:重試耗盡後,返回nack,訊息重新入隊

  • RepublishMessageRecoverer:重試耗盡後,將失敗訊息投遞到指定的交換機

這種情況下第一種方案RejectAndDontRequeueRecoverer會直接將訊息丟棄;而第二種方案ImmediateRequeueMessageRecoverer則會將訊息重新塞入simple.queue。

比較優雅的一種處理方案是RepublishMessageRecoverer,失敗後將訊息投遞到一個指定的,專門存放異常訊息的佇列,後續由人工集中處理。

4.2.1 在consumer服務中定義處理失敗訊息的交換機和佇列

4.2.2 定義一個RepublishMessageRecoverer關聯佇列和交換機

4.2.3 測試

完整程式碼

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ErrorMsgConfig {

    /**
     * 異常訊息交換機
     * @return
     */
    @Bean
    public DirectExchange errorMsgExchange() {
        return new DirectExchange("error.direct");
    }

    /**
     * 異常訊息佇列
     * @return
     */
    @Bean
    public Queue errorMsgQueue() {
        return new Queue("error.queue");
    }

    /**
     * 繫結佇列&交換機
     * @return
     */
    @Bean
    public Binding errorMsgBinding() {
        // 利用BindingBuilder,先指定佇列,再指定交換機,最後寫路由key
        return BindingBuilder
                .bind(errorMsgQueue())
                .to(errorMsgExchange())
                .with("error");
    }

    /**
     * 關聯佇列&交換機
     * @return
     */
    @Bean
    public MessageRecoverer republishMsgRecover(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
        // 注意此處的交換機名字和路由key一定要與上面定義的一致
    }
}

可以看到,不僅訊息被傳遞過來了,而且異常資訊也被完整呈現

推薦這種方案

5. 總結

[面試題] 如何確保RabbitMQ訊息的可靠性?

  • 開啟生產者確認機制,確保生產者的訊息能到達佇列
  • 開啟持久化功能,確保訊息未消費前在佇列中不會丟失
  • 開啟消費者確認機制為auto,由spring確認訊息處理成功後完成ack
  • 開啟消費者失敗重試機制,並設定MessageRecoverer,多次重試失敗後將訊息投遞到異常交換機,交由人工處理