1. 程式人生 > >Spring Boot 構建應用——整合訊息中介軟體 RabbitMQ

Spring Boot 構建應用——整合訊息中介軟體 RabbitMQ

RabbitMQ 是訊息中介軟體的一種,實現了 AMQP 標準。訊息中介軟體的工作過程可以用生產者-消費者模型來表示。生產者發訊息到訊息佇列中去,消費者監聽指定的訊息佇列,並且當訊息佇列收到訊息之後,接收訊息佇列傳來的訊息,並且給予相應的處理。訊息佇列常用於分散式系統之間互相資訊的傳遞。
對於 RabbitMQ 來說,這三個基本模組以外,還添加了一個模組,交換機 (Exchange)。它使得生產者和訊息佇列之間產生了隔離,生產者將訊息傳送給交換機,而交換機則根據排程策略把相應的訊息轉發給對應的訊息佇列。
交換機的主要作用是接收相應的訊息並且繫結到指定的佇列,有三種類型的交換機:

交換機 說明
direct 預設的交換機 (一對一)。即建立訊息佇列的時候,指定一個BindingKey,當生產者傳送訊息的時候,指定對應的Key,當Key和訊息佇列的BindingKey一致的時候,訊息將會被髮送到該訊息佇列中。
fanout 路由廣播的形式,將會把訊息發給繫結它的全部佇列,即便設定了key,也會被忽略。生產者其實僅關注Exchange與Route Key, 消費者僅關注Queue
topic Topic轉發資訊主要是依據萬用字元,佇列和交換機的繫結主要是依據一種模式(萬用字元+字串),而當傳送訊息的時候,只有指定的Key和該模式相匹配的時候,訊息才會被髮送到該訊息佇列中。

RabbitMQ交換機

Spring Boot 整合 RabbitMQ 非常簡單,需要新增 Maven 起步依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

一些核心類如下:

org.springframework.amqp.core.Queue: 佇列
org.springframework.amqp.core.Binding: 建立交換機與佇列的繫結關係
org.springframework.amqp.core.DirectExchange: Direct交換機
org.springframework.amqp.core.TopicExchange: Topic交換機
org.springframework.amqp.core.FanoutExchange: Fanout交換機
org.springframework.amqp.support.converter.MessageConverter: 訊息轉換器, 如將Java類轉換JSON型別傳送至Broker, 從Broker處獲取JSON訊息轉換為Java型別
org.springframework.amqp.core.AmqpTemplate: 多用於生產者端釋出訊息
org.springframework.amqp.core.AmqpAdmin: 用於Exchange, Queue等的動態管理

然後通過 application.properties 中的 spring.rabbitmq.* 字首配置屬性(生產者、消費者應用都一樣):

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/admin

下面看一下在三種類型的交換機下的不同實現。

1.Spring Boot整合RabbitMQ (direct交換機)

在 Java 配置類中註冊 bean(生產者和消費者的是一樣的,因為監聽的是同一個佇列,所以佇列名要先約定好):

@Configuration
public class RabbitConfiguration {
    //佇列名
    public static final String TRADE_QUEUE = "funds";
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    @Bean
    public Queue queue() {
        return new Queue(TRADE_QUEUE);
    }
}

1.生產者生產訊息

在 SpringBoot 中,我們使用 AmqpTemplate 去傳送訊息(呼叫 send 方法即傳送):

@Component
public class HelloProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;
    public void send() {
        amqpTemplate.convertAndSend(RabbitConfiguration.TRADE_QUEUE, "Hello, Rabbit!");
    }
}

2.消費者消費訊息

配置監聽器監聽指定的 Queue,當訊息佇列有訊息的時候予以接收:

@Component
public class HelloConsumer {
    @RabbitListener(queues = {RabbitConfiguration.TRADE_QUEUE})
    public void processBootTask(String content) {
        System.out.println(content);
    }
}

2.Spring Boot整合RabbitMQ (fanout交換機)

要使用 fanout 交換機的話,那麼生產者與消費者的配置就不一樣了。

1.生產者生產訊息

在 Java 配置類中註冊 bean:

@Configuration
public class RabbitConfiguration {
    public static final String DEFAULT_FANOUT_EXCHANGE = "admin.fanout";
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(DEFAULT_FANOUT_EXCHANGE);
    }
}

在 SpringBoot 中,我們使用 AmqpTemplate 去傳送訊息:

@Component
public class HelloProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;
    public void send() {
        //引數一:交換機名稱,引數二:傳送的key,引數三:內容
        amqpTemplate.convertAndSend(RabbitConfiguration.DEFAULT_FANOUT_EXCHANGE, "", "Hello, Rabbit!");
    }
}

2.消費者消費訊息

在 Java 配置類中註冊 bean:

@Configuration
public class RabbitConfiguration {
    public static final String DEFAULT_FANOUT_EXCHANGE = "admin.fanout";
    public static final String FANOUT_QUEUE = "admin-" + UUID.randomUUID();
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    //配置廣播路由器
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(DEFAULT_FANOUT_EXCHANGE);
    }
    //配置臨時佇列
    @Bean
    public Queue randomQueue() {
        return new Queue(FANOUT_QUEUE);
    }
    @Bean
    public Binding bindingExchange() {
        return BindingBuilder.bind(randomQueue()).to(fanoutExchange());
    }
}

配置監聽器,監聽 Queue,當訊息佇列有訊息時,監聽器就會接收到訊息(也可配置多個佇列接收):

@Component
public class HelloConsumer {
    @RabbitListener(queues = "#{rabbitConfiguration.FANOUT_QUEUE}")
    public void processBootTask(String content) {
        System.out.println(content);
    }
}

3.Spring Boot整合RabbitMQ (topic交換機)

使用 topic 交換機,生產者與消費者的配置也不一樣。

1.生產者生產訊息

在 Java 配置類中註冊 bean:

@Configuration
public class RabbitConfiguration {
    public static final String DEFAULT_TOPIC_EXCHANGE = "admin.topic";
    public static final String TOPIC_ROUTE_KEY = "A.B.C";
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(DEFAULT_TOPIC_EXCHANGE);
    }
}

在 SpringBoot 中,我們使用 AmqpTemplate 去傳送訊息:

@Component
public class HelloProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;
    public void send() {
        //引數一:交換機名稱,引數二:傳送的key,引數三:內容
        amqpTemplate.convertAndSend(RabbitConfiguration.DEFAULT_TOPIC_EXCHANGE, RabbitConfiguration.TOPIC_ROUTE_KEY, "Hello, Rabbit!");
    }
}

RabbitMQ 將會根據引數二去尋找有沒有匹配此規則的佇列,如果有則把訊息給它,如果有且不止一個,則把訊息分發給匹配的佇列 (每個佇列都有訊息)。

2.消費者消費訊息

在 Java 配置類中註冊 bean:

@Configuration
public class RabbitConfiguration {
    public static final String DEFAULT_TOPIC_EXCHANGE = "admin.topic";
    public static final String TOPIC_QUEUE = "admin-" + UUID.randomUUID();
    //*表示一個詞,#表示零個或多個詞
    public static final String TOPIC_ROUTE_KEY = "#.#";
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    //配置主題路由器
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(DEFAULT_TOPIC_EXCHANGE);
    }
    //配置臨時佇列
    @Bean
    public Queue randomQueue() {
        return new Queue(TOPIC_QUEUE);
    }
    @Bean
    public Binding bindingExchange() {
        return BindingBuilder.bind(randomQueue()).to(topicExchange()).with(TOPIC_ROUTE_KEY);
    }
}

配置監聽器,監聽 Queue,當訊息佇列匹配此規則時,監聽器就會接收到訊息:

@Component
public class HelloConsumer {
    @RabbitListener(queues = "#{rabbitConfiguration.TOPIC_QUEUE}")
    public void processBootTask(String content) {
        System.out.println(content);
    }
}

生產端指定 Route Key 為 A.B.C, 下面是消費端繫結 Route Key 的不同情況:

消費端繫結的Route Key 是否匹配
A.B.C Yes
# Yes
A.# Yes
. No
A.* No
A.B.* Yes
A.*.C Yes