1. 程式人生 > 實用技巧 >RabbitMQ整合SpringBoot

RabbitMQ整合SpringBoot

SpringBoot 是在 Spring AMQP 上面再次封裝了一層,不需要再像 Spring AMQP 一樣注入各個元件 Bean, 只需要在配置檔案上配置好 RabbitMQ 屬性,SpringBoot 就可以自動注入了。

而使用 @RabbitListener 註解可以輕鬆實現消費端事件監聽處理。

一、專案配置

1.1 新增 Spring AMQP 依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.3.1.RELEASE</version>
</dependency>

1.2 配置屬性

/resources/application.properties 檔案中申明屬性:

# rabbitmq 配置
## 主機地址
spring.rabbitmq.host=111.231.83.100
## 埠號
spring.rabbitmq.port=5672
## 虛擬主機路徑
spring.rabbitmq.virtual-host=/
## 連線超時時間
spring.rabbitmq.connection-timeout=15000
## 消費者設定手動確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
## 消費者每次消費數量
spring.rabbitmq.listener.simple.concurrency=1
## 最大消費者數量
spring.rabbitmq.listener.simple.max-concurrency=5

# 自定義屬性
## 佇列名稱
spring.rabbitmq.listener.order.queue.name=orderQueue
## 佇列是否持久化
spring.rabbitmq.listener.order.queue.durable=true
## 交換機名稱
spring.rabbitmq.listener.order.exchange.name=orderExchange
## 交換機是否持久化
spring.rabbitmq.listener.order.exchange.durable=true
## 交換機型別
spring.rabbitmq.listener.order.exchange.type=topic
## 是否忽略異常
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
## 路由 key
spring.rabbitmq.listener.order.routingKey=order#

相關屬性表:

屬性名 說明 預設值
spring.rabbitmq.address 客戶端連線的地址,有多個的時候使用逗號分隔,該地址可以是IP與Port的結合
spring.rabbitmq.cache.channel.checkout-timeout 當快取已滿時,獲取Channel的等待時間,單位為毫秒
spring.rabbitmq.cache.channel.size 快取中保持的Channel數量
spring.rabbitmq.cache.connection.mode 連線快取的模式 CHANNEL
spring.rabbitmq.cache.connection.size 快取的連線數
spring.rabbitmq.connnection-timeout 連線超時引數單位為毫秒:設定為“0”代表無窮大
spring.rabbitmq.dynamic 預設建立一個AmqpAdmin的Bean true
spring.rabbitmq.host RabbitMQ的主機地址 localhost
spring.rabbitmq.listener.acknowledge-mode 容器的acknowledge模式
spring.rabbitmq.listener.auto-startup 啟動時自動啟動容器 true
spring.rabbitmq.listener.concurrency 消費者的最小數量
spring.rabbitmq.listener.default-requeue-rejected 投遞失敗時是否重新排隊 true
spring.rabbitmq.listener.max-concurrency 消費者的最大數量
spring.rabbitmq.listener.prefetch 在單個請求中處理的訊息個數,他應該大於等於事務數量
spring.rabbitmq.listener.retry.enabled 不論是不是重試的釋出 false
spring.rabbitmq.listener.retry.initial-interval 第一次與第二次投遞嘗試的時間間隔 1000
spring.rabbitmq.listener.retry.max-attempts 嘗試投遞訊息的最大數量 3
spring.rabbitmq.listener.retry.max-interval 兩次嘗試的最大時間間隔 10000
spring.rabbitmq.listener.retry.multiplier 上一次嘗試時間間隔的乘數 1.0
spring.rabbitmq.listener.retry.stateless 不論重試是有狀態的還是無狀態的 true
spring.rabbitmq.listener.transaction-size 在一個事務中處理的訊息數量。為了獲得最佳效果,該值應設定為小於等於每個請求中處理的訊息個數,即spring.rabbitmq.listener.prefetch的值
spring.rabbitmq.password 登入到RabbitMQ的密碼
spring.rabbitmq.port RabbitMQ的埠號 5672
spring.rabbitmq.publisher-confirms 開啟Publisher Confirm機制 false
spring.rabbitmq.publisher-returns 開啟publisher Return機制 false
spring.rabbitmq.requested-heartbeat 請求心跳超時時間,單位為秒
spring.rabbitmq.ssl.enabled 啟用SSL支援 false
spring.rabbitmq.ssl.key-store 儲存Sspring.rabbitmq.listener.simple.acknowledge-modeSL證書的地址
spring.rabbitmq.ssl.key-store-password 訪問SSL證書的地址使用的密碼
spring.rabbitmq.ssl.trust-store SL的可信地址
spring.rabbitmq.ssl.trust-store-password 訪問SSL的可信地址的密碼
spring.rabbitmq.ssl.algorithm SSL演算法,預設使用Rabbit的客戶端演算法庫
spring.rabbitmq.template.mandatory 啟用強制資訊 false
spring.rabbitmq.template.receive-timeout receive()方法的超時時間 0
spring.rabbitmq.template.reply-timeout sendAndReceive()方法的超時時間 5000
spring.rabbitmq.template.retry.enabled 設定為true的時候RabbitTemplate能夠實現重試 false
spring.rabbitmq.template.retry.initial-interval 第一次與第二次釋出訊息的時間間隔 1000
spring.rabbitmq.template.retry.max-attempts 嘗試釋出訊息的最大數量 3
spring.rabbitmq.template.retry.max-interval 嘗試釋出訊息的最大時間間隔 10000
spring.rabbitmq.template.retry.multiplier 上一次嘗試時間間隔的乘數 1.0
spring.rabbitmq.username 登入到RabbitMQ的使用者名稱
spring.rabbitmq.virtual-host 連線到RabbitMQ的虛擬主機

1.3 開啟 @RabbitListener 註解

  • 通過 @EnableRabbit 註解來啟用 @RabbitListener

  • @RabbitListener 註解指定了目標方法來作為消費訊息的方法,通過註解引數指定所監聽的佇列或者 Binding 。使用 @RabbitListener 可以設定一個自己明確預設值的 RabbitListenerContainerFactory 物件。

@EnableRabbit
@SpringBootApplication
public class SpringbootApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringbootApplication.class, args);
    }
}

二、簡單示例

2.1 配置交換機、佇列、繫結關係

@Configuration
public class RabbitMQConfig {
    @Bean
    public TopicExchange orderExchange(){
        return new TopicExchange("simpleExchange", true, false);
    }

    @Bean
    public Queue orderQueue(){
        return new Queue("simpleQueue", true);
    }
    @Bean
    public Binding beanBinding(TopicExchange orderExchange, Queue orderQueue) {
        return BindingBuilder
                // 建立佇列
                .bind(orderQueue)
                // 建立交換機
                .to(orderExchange)
                // 指定路由 Key
                .with("simple.#");
    }
}

2.2 建立消費者物件

@Component
public class Consume {

    @RabbitListener(queues = "simpleQueue")
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消費端Payload: " + message.getPayload());
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }

}

2.3 建立生產者物件

@SpringBootTest
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendSimpleMessage() {
        String exchange = "simpleExchange";
        String routingKey = "simple.message";
        rabbitTemplate.convertAndSend(exchange, routingKey, "hello simpleExchange");
    }
}

2.4 測試

先啟動應用,然後執行 sendSimpleMessage() 方法,觀察控制檯輸出:

--------------------------------------
消費端Payload: hello simpleExchange

說明 @RabbitListener 生效了。

三、@RabbitListener 高階使用

3.1 在 @RabbitListener 中申明繫結關係

消費者:

public class BindingConsume {


    @RabbitListener(
            queues = "simpleQueue")
    @RabbitListener(
            bindings =
            @QueueBinding(
                    exchange = @Exchange(value = "bindingExchange",
                            type = "topic"),
                    value = @Queue(value = "bindingQueue",
                            durable = "true"),
                    key = "binding.*"
            )
    )
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消費端Payload: " + message.getPayload());
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }

}

生產者:

@SpringBootTest
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendBindingMessage() {
        String exchange = "bindingExchange";
        String routingKey = "binding.message";
        rabbitTemplate.convertAndSend(exchange, routingKey, "hello bindingExchange");
    }
}    

先啟動應用,然後執行 sendBindingMessage() 方法,觀察控制檯輸出:

--------------------------------------
消費端Payload: hello bindingExchange

說明 @RabbitListener 生效了。

3.2 自定義訊息引數型別

建立訂單物件

// 記得需要實現序列化介面
public class Order implements Serializable{
    private String orderId;
    private BigDecimal amount;

    public Order() {

    }

    public Order(String orderId, BigDecimal amount) {
        this.orderId = orderId;
        this.amount = amount;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public BigDecimal getAmount() {
        return amount;
    }

    public void setAmount(BigDecimal amount) {
        this.amount = amount;
    }
    
    @Override
    public String toString() {
        return "Order{" +
                "orderId='" + orderId + '\'' +
                ", amount=" + amount +
                '}';
    }
}

建立消費者

@Component
public class OrderConsume {

    @RabbitListener(
            bindings =
            @QueueBinding(
                    value = @Queue(value = "orderQueue",
                            durable = "true"),
                    exchange = @Exchange(value = "orderExchange",
                            type = "topic"),
                    key = "order.*"
            )
    )
    public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消費端order: " + order.toString());
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 手工ACK
        channel.basicAck(deliveryTag, false);
    }

}

建立生產者

@SpringBootTest
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendOrderMessage() {
        String exchange = "orderExchange";
        String routingKey = "order.message";
        Order newOrder = new Order("10001", BigDecimal.valueOf(300));
        rabbitTemplate.convertAndSend(exchange, routingKey, newOrder);
    }
}    

先啟動應用,然後執行 sendOrderMessage() 方法,觀察控制檯輸出:

--------------------------------------
消費端order: Order{orderId='10001', amount=300}

3.3 @RabbitListener@RabbitHandler 的使用

@RabbitListener 除了在方法上面宣告,也可以在類上面申明 。當宣告在類上之後,就需要和@RabbitHandler 搭配使用:

  • @RabbitListener 可以標註在類上面,需配合 @RabbitHandler 註解一起使用;
  • @RabbitListener 標註在類上面表示當有收到訊息的時候,就交給 @RabbitHandler 的方法處理,具體使用哪個方法處理,根據 MessageConverter 轉換後的引數型別;
  • 當找不到對應的引數型別方法時將會丟擲 ListenerExecutionFailedException 異常,可以通過設定 @RabbitHandlerisDefault 屬性為 true 為它宣告一個預設處理方法。

建立 Consume:

@Component
@RabbitListener(
        bindings =
        @QueueBinding(
                value = @Queue(value = "handlerQueue",
                        durable = "true"),
                exchange = @Exchange(value = "handlerExchange",
                        type = "topic"),
                key = "handler.*"
        )
)
public class HandlerConsume {

    @RabbitHandler(isDefault = true)
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消費端預設處理: " + message.getPayload());
        Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }

    @RabbitHandler
    public void onMessage(@Payload String msg, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消費端String引數: " + msg);
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }


    @RabbitHandler
    public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消費端Order引數: " + order.toString());
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }

}

建立生產者:

@SpringBootTest
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendHandlerStringMessage() {
        String exchange = "handlerExchange";
        String routingKey = "handler.message";
        // 傳送字串訊息
        rabbitTemplate.convertAndSend(exchange, routingKey, "hello handlerExchange");
    }

    @Test
    public void sendHandlerOrderMessage() {
        String exchange = "handlerExchange";
        String routingKey = "handler.message";
        // 傳送物件訊息
        Order newOrder = new Order("10001", BigDecimal.valueOf(300));
        rabbitTemplate.convertAndSend(exchange, routingKey, newOrder);
    }
    
    @Test
    public void sendHandlerArrayMessage() {
        String exchange = "handlerExchange";
        String routingKey = "handler.message";
        // 傳送字元陣列訊息
        String[] array = {"123","456","789"};
        rabbitTemplate.convertAndSend(exchange, routingKey, array);
    }
}    

先啟動應用,然後分別執行 sendHandlerStringMessage()sendHandlerOrderMessage()sendHandlerArrayMessage 方法,觀察控制檯輸出:

--------------------------------------
消費端String引數: hello handlerExchange
--------------------------------------
消費端Order引數: Order{orderId='10001', amount=300}
--------------------------------------
消費端預設處理: [Ljava.lang.String;@7c655a9a

說明 @RabbitListener@RabbitHandler 生效了。