Spring Boot 構建應用——整合訊息中介軟體 RabbitMQ
RabbitMQ 是訊息中介軟體的一種,實現了 AMQP 標準。訊息中介軟體的工作過程可以用生產者-消費者模型來表示。生產者發訊息到訊息佇列中去,消費者監聽指定的訊息佇列,並且當訊息佇列收到訊息之後,接收訊息佇列傳來的訊息,並且給予相應的處理。訊息佇列常用於分散式系統之間互相資訊的傳遞。
對於 RabbitMQ 來說,這三個基本模組以外,還添加了一個模組,交換機 (Exchange)。它使得生產者和訊息佇列之間產生了隔離,生產者將訊息傳送給交換機,而交換機則根據排程策略把相應的訊息轉發給對應的訊息佇列。
交換機的主要作用是接收相應的訊息並且繫結到指定的佇列,有三種類型的交換機:
交換機 | 說明 |
---|---|
direct | 預設的交換機 (一對一)。即建立訊息佇列的時候,指定一個BindingKey,當生產者傳送訊息的時候,指定對應的Key,當Key和訊息佇列的BindingKey一致的時候,訊息將會被髮送到該訊息佇列中。 |
fanout | 路由廣播的形式,將會把訊息發給繫結它的全部佇列,即便設定了key,也會被忽略。生產者其實僅關注Exchange與Route Key, 消費者僅關注Queue |
topic | Topic轉發資訊主要是依據萬用字元,佇列和交換機的繫結主要是依據一種模式(萬用字元+字串),而當傳送訊息的時候,只有指定的Key和該模式相匹配的時候,訊息才會被髮送到該訊息佇列中。 |
Spring Boot 整合 RabbitMQ 非常簡單,需要新增 Maven 起步依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
一些核心類如下:
org.springframework.amqp.core.Queue: 佇列 org.springframework.amqp.core.Binding: 建立交換機與佇列的繫結關係 org.springframework.amqp.core.DirectExchange: Direct交換機 org.springframework.amqp.core.TopicExchange: Topic交換機 org.springframework.amqp.core.FanoutExchange: Fanout交換機 org.springframework.amqp.support.converter.MessageConverter: 訊息轉換器, 如將Java類轉換JSON型別傳送至Broker, 從Broker處獲取JSON訊息轉換為Java型別 org.springframework.amqp.core.AmqpTemplate: 多用於生產者端釋出訊息 org.springframework.amqp.core.AmqpAdmin: 用於Exchange, Queue等的動態管理
然後通過 application.properties 中的 spring.rabbitmq.* 字首配置屬性(生產者、消費者應用都一樣):
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/admin
下面看一下在三種類型的交換機下的不同實現。
1.Spring Boot整合RabbitMQ (direct交換機)
在 Java 配置類中註冊 bean(生產者和消費者的是一樣的,因為監聽的是同一個佇列,所以佇列名要先約定好):
@Configuration
public class RabbitConfiguration {
//佇列名
public static final String TRADE_QUEUE = "funds";
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Queue queue() {
return new Queue(TRADE_QUEUE);
}
}
1.生產者生產訊息
在 SpringBoot 中,我們使用 AmqpTemplate 去傳送訊息(呼叫 send 方法即傳送):
@Component
public class HelloProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
amqpTemplate.convertAndSend(RabbitConfiguration.TRADE_QUEUE, "Hello, Rabbit!");
}
}
2.消費者消費訊息
配置監聽器監聽指定的 Queue,當訊息佇列有訊息的時候予以接收:
@Component
public class HelloConsumer {
@RabbitListener(queues = {RabbitConfiguration.TRADE_QUEUE})
public void processBootTask(String content) {
System.out.println(content);
}
}
2.Spring Boot整合RabbitMQ (fanout交換機)
要使用 fanout 交換機的話,那麼生產者與消費者的配置就不一樣了。
1.生產者生產訊息
在 Java 配置類中註冊 bean:
@Configuration
public class RabbitConfiguration {
public static final String DEFAULT_FANOUT_EXCHANGE = "admin.fanout";
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(DEFAULT_FANOUT_EXCHANGE);
}
}
在 SpringBoot 中,我們使用 AmqpTemplate 去傳送訊息:
@Component
public class HelloProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
//引數一:交換機名稱,引數二:傳送的key,引數三:內容
amqpTemplate.convertAndSend(RabbitConfiguration.DEFAULT_FANOUT_EXCHANGE, "", "Hello, Rabbit!");
}
}
2.消費者消費訊息
在 Java 配置類中註冊 bean:
@Configuration
public class RabbitConfiguration {
public static final String DEFAULT_FANOUT_EXCHANGE = "admin.fanout";
public static final String FANOUT_QUEUE = "admin-" + UUID.randomUUID();
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
//配置廣播路由器
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(DEFAULT_FANOUT_EXCHANGE);
}
//配置臨時佇列
@Bean
public Queue randomQueue() {
return new Queue(FANOUT_QUEUE);
}
@Bean
public Binding bindingExchange() {
return BindingBuilder.bind(randomQueue()).to(fanoutExchange());
}
}
配置監聽器,監聽 Queue,當訊息佇列有訊息時,監聽器就會接收到訊息(也可配置多個佇列接收):
@Component
public class HelloConsumer {
@RabbitListener(queues = "#{rabbitConfiguration.FANOUT_QUEUE}")
public void processBootTask(String content) {
System.out.println(content);
}
}
3.Spring Boot整合RabbitMQ (topic交換機)
使用 topic 交換機,生產者與消費者的配置也不一樣。
1.生產者生產訊息
在 Java 配置類中註冊 bean:
@Configuration
public class RabbitConfiguration {
public static final String DEFAULT_TOPIC_EXCHANGE = "admin.topic";
public static final String TOPIC_ROUTE_KEY = "A.B.C";
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(DEFAULT_TOPIC_EXCHANGE);
}
}
在 SpringBoot 中,我們使用 AmqpTemplate 去傳送訊息:
@Component
public class HelloProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
//引數一:交換機名稱,引數二:傳送的key,引數三:內容
amqpTemplate.convertAndSend(RabbitConfiguration.DEFAULT_TOPIC_EXCHANGE, RabbitConfiguration.TOPIC_ROUTE_KEY, "Hello, Rabbit!");
}
}
RabbitMQ 將會根據引數二去尋找有沒有匹配此規則的佇列,如果有則把訊息給它,如果有且不止一個,則把訊息分發給匹配的佇列 (每個佇列都有訊息)。
2.消費者消費訊息
在 Java 配置類中註冊 bean:
@Configuration
public class RabbitConfiguration {
public static final String DEFAULT_TOPIC_EXCHANGE = "admin.topic";
public static final String TOPIC_QUEUE = "admin-" + UUID.randomUUID();
//*表示一個詞,#表示零個或多個詞
public static final String TOPIC_ROUTE_KEY = "#.#";
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
//配置主題路由器
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(DEFAULT_TOPIC_EXCHANGE);
}
//配置臨時佇列
@Bean
public Queue randomQueue() {
return new Queue(TOPIC_QUEUE);
}
@Bean
public Binding bindingExchange() {
return BindingBuilder.bind(randomQueue()).to(topicExchange()).with(TOPIC_ROUTE_KEY);
}
}
配置監聽器,監聽 Queue,當訊息佇列匹配此規則時,監聽器就會接收到訊息:
@Component
public class HelloConsumer {
@RabbitListener(queues = "#{rabbitConfiguration.TOPIC_QUEUE}")
public void processBootTask(String content) {
System.out.println(content);
}
}
生產端指定 Route Key 為 A.B.C, 下面是消費端繫結 Route Key 的不同情況:
消費端繫結的Route Key | 是否匹配 |
---|---|
A.B.C | Yes |
# | Yes |
A.# | Yes |
. | No |
A.* | No |
A.B.* | Yes |
A.*.C | Yes |