1. 程式人生 > 其它 >RabbitMQ重試機制

RabbitMQ重試機制

技術標籤:RabbitMQ我の原創rabbitmq

1、RabbitMQ重試機制的簡介

RabbitMQ 不會為未確認的訊息設定過期時間,它判斷此訊息是否需要重新投遞給消費者的唯一依據是消費該訊息連線是否已經斷開,這個設定的原因是 RabbitMQ 允許消費者消費一條訊息的時間可以很久很久。

RabbitMQ 的 Web 管理平臺上可以看到當前佇列中的 “Ready” 狀態和 “Unacknowledged” 狀態的訊息數,分別對應等待投遞給消費者的訊息數和已經投遞給消費者但是未收到確認訊號的訊息數。如下圖:

注意事項:

如果在處理訊息的過程中,消費者的伺服器在處理訊息的時候出現異常,那麼可能這條正在處理的訊息就沒有完成訊息消費,資料就會丟失。為了確保資料不會丟失,RabbitMQ 支援訊息確認-ACK。

如果忘記了訊息確認,那麼後果很嚴重。當 Consumer 退出時候,Message 會一直重新分發。然後 RabbitMQ 會佔用越來越多的內容,由於 RabbitMQ 會長時間執行,因此這個"記憶體洩漏"是致命的。

RabbitMQ 重試機制核心配置:

spring:
  # 專案名稱
  application:
    name: rabbitmq-consumer
  # RabbitMQ服務配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    # 重試機制
    listener:
      simple:
        retry:
          enabled: true #是否開啟消費者重試
          max-attempts: 5 #最大重試次數
          initial-interval: 5000ms #重試間隔時間(單位毫秒)
          max-interval: 1200000ms #重試最大時間間隔(單位毫秒)
          multiplier: 2 #間隔時間乘子,間隔時間*乘子=下一次的間隔時間,最大不能超過設定的最大間隔時間

2、RabbitMQ重試機制的實現

下面將通過示例來講解RabbitMQ 重試機制的實現。首先需要建立兩個 SpringBoot 專案並整合 RabbitMQ 客戶端。

2.1 實現訊息傳送端

(1)建立第一個 SpringBoot 專案( rabbitmq-provider 訊息傳送專案)。

在pom.xml配置資訊檔案中,新增相關依賴檔案:

<!-- AMQP客戶端 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.4.1</version>
</dependency>

在application.yml 配置檔案中配置 RabbitMQ 服務:

spring:
  # 專案名稱
  application:
    name: rabbitmq-provider
  # RabbitMQ服務配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

(2)配置佇列

在 rabbitmq-provider(訊息傳送專案)中,配置佇列名稱,並將佇列交由 IoC 管理,程式碼如下:

package com.pjb.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ配置類
 * @author pan_junbiao
 **/
@Configuration
public class RabbitMqConfig
{
    public static final String QUEUE_NAME = "queue_name"; //佇列名稱
    public static final String EXCHANGE_NAME = "exchange_name"; //交換器名稱
    public static final String ROUTING_KEY = "routing_key"; //路由鍵

    /**
     * 佇列
     */
    @Bean
    public Queue queue()
    {
        /**
         * 建立佇列,引數說明:
         * String name:佇列名稱。
         * boolean durable:設定是否持久化,預設是 false。durable 設定為 true 表示持久化,反之是非持久化。
         * 持久化的佇列會存檔,在伺服器重啟的時候不會丟失相關資訊。
         * boolean exclusive:設定是否排他,預設也是 false。為 true 則設定佇列為排他。
         * boolean autoDelete:設定是否自動刪除,為 true 則設定佇列為自動刪除,
         * 當沒有生產者或者消費者使用此佇列,該佇列會自動刪除。
         * Map<String, Object> arguments:設定佇列的其他一些引數。
         */
        return new Queue(QUEUE_NAME, true, false, false, null);
    }

    /**
     * Direct交換器
     */
    @Bean
    public DirectExchange exchange()
    {
        /**
         * 建立交換器,引數說明:
         * String name:交換器名稱
         * boolean durable:設定是否持久化,預設是 false。durable 設定為 true 表示持久化,反之是非持久化。
         * 持久化可以將交換器存檔,在伺服器重啟的時候不會丟失相關資訊。
         * boolean autoDelete:設定是否自動刪除,為 true 則設定佇列為自動刪除,
         */
        return new DirectExchange(EXCHANGE_NAME, true, false);
    }

    /**
     * 繫結
     */
    @Bean
    Binding binding(DirectExchange exchange, Queue queue)
    {
        //將佇列和交換機繫結, 並設定用於匹配鍵:routingKey
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}

(3)建立傳送者

在 rabbitmq-provider(訊息傳送專案)中,建立傳送者,利用rabbitTemplate.convertAndSend() 方法傳送訊息,程式碼如下:

package com.pjb;

import com.pjb.config.RabbitMqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * RabbitMq測試類
 * @author pan_junbiao
 **/
@SpringBootTest
public class RabbitMqTest
{
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void sendMessage()
    {
        String message = "您好,歡迎訪問 pan_junbiao的部落格";
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUTING_KEY, message);
        System.out.println("訊息傳送成功!");
    }
}

2.2 實現訊息接收端

(1)建立第二個 SpringBoot 專案( rabbitmq-consumer 訊息接收專案)。

在pom.xml配置資訊檔案中,新增相關依賴檔案:

<!-- AMQP客戶端 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.4.1</version>
</dependency>

在application.yml 配置檔案中配置 RabbitMQ 服務,這裡需要配置RabbitMQ 重試機制:

spring:
  # 專案名稱
  application:
    name: rabbitmq-consumer
  # RabbitMQ服務配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    # 重試機制
    listener:
      simple:
        retry:
          enabled: true #是否開啟消費者重試
          max-attempts: 5 #最大重試次數
          initial-interval: 5000ms #重試間隔時間(單位毫秒)
          max-interval: 1200000ms #重試最大時間間隔(單位毫秒)
          multiplier: 2 #間隔時間乘子,間隔時間*乘子=下一次的間隔時間,最大不能超過設定的最大間隔時間

(2)建立接收者

在 rabbitmq-consumer(訊息接收專案)中,建立建立接收者,注意,傳送者和接收者的 Queue 名稱必須一致,否則不能接收訊息。

接收者接收到訊息後,列印輸出訊息,然後程式丟擲執行時異常,觀察現象。程式碼如下:

package com.pjb.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 接收者
 * @author pan_junbiao
 **/
@Component
@RabbitListener(queues="queue_name")
public class Receiver
{
    @RabbitHandler
    public void process(String message)
    {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("接收訊息: " + message + " 接收時間:" + sdf.format(new Date()));
        throw new RuntimeException();
    }
}

特別注意:

如果在訊息接收端的 application.yml 配置檔案中沒有新增RabbitMQ 重試機制的相關配置,當接收端收到訊息後程序丟擲異常,那麼傳送端將得不到訊息確認(ACK),此時傳送端將會迴圈的傳送訊息,最終導致記憶體溢位。

執行結果:

從上述執行結果來看,當接收端重試5次後,將訊息確認(ACK)。