RabbitMQ整合SpringBoot
SpringBoot 是在 Spring AMQP 上面再次封裝了一層,不需要再像 Spring AMQP 一樣注入各個元件 Bean, 只需要在配置檔案上配置好 RabbitMQ 屬性,SpringBoot 就可以自動注入了。
而使用 @RabbitListener 註解可以輕鬆實現消費端事件監聽處理。
一、專案配置
1.1 新增 Spring AMQP 依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.3.1.RELEASE</version> </dependency>
1.2 配置屬性
在 /resources/application.properties 檔案中申明屬性:
# rabbitmq 配置 ## 主機地址 spring.rabbitmq.host=111.231.83.100 ## 埠號 spring.rabbitmq.port=5672 ## 虛擬主機路徑 spring.rabbitmq.virtual-host=/ ## 連線超時時間 spring.rabbitmq.connection-timeout=15000 ## 消費者設定手動確認 spring.rabbitmq.listener.simple.acknowledge-mode=manual ## 消費者每次消費數量 spring.rabbitmq.listener.simple.concurrency=1 ## 最大消費者數量 spring.rabbitmq.listener.simple.max-concurrency=5 # 自定義屬性 ## 佇列名稱 spring.rabbitmq.listener.order.queue.name=orderQueue ## 佇列是否持久化 spring.rabbitmq.listener.order.queue.durable=true ## 交換機名稱 spring.rabbitmq.listener.order.exchange.name=orderExchange ## 交換機是否持久化 spring.rabbitmq.listener.order.exchange.durable=true ## 交換機型別 spring.rabbitmq.listener.order.exchange.type=topic ## 是否忽略異常 spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true ## 路由 key spring.rabbitmq.listener.order.routingKey=order#
相關屬性表:
屬性名 | 說明 | 預設值 |
---|---|---|
spring.rabbitmq.address | 客戶端連線的地址,有多個的時候使用逗號分隔,該地址可以是IP與Port的結合 | |
spring.rabbitmq.cache.channel.checkout-timeout | 當快取已滿時,獲取Channel的等待時間,單位為毫秒 | |
spring.rabbitmq.cache.channel.size | 快取中保持的Channel數量 | |
spring.rabbitmq.cache.connection.mode | 連線快取的模式 | CHANNEL |
spring.rabbitmq.cache.connection.size | 快取的連線數 | |
spring.rabbitmq.connnection-timeout | 連線超時引數單位為毫秒:設定為“0”代表無窮大 | |
spring.rabbitmq.dynamic | 預設建立一個AmqpAdmin的Bean | true |
spring.rabbitmq.host | RabbitMQ的主機地址 | localhost |
spring.rabbitmq.listener.acknowledge-mode | 容器的acknowledge模式 | |
spring.rabbitmq.listener.auto-startup | 啟動時自動啟動容器 | true |
spring.rabbitmq.listener.concurrency | 消費者的最小數量 | |
spring.rabbitmq.listener.default-requeue-rejected | 投遞失敗時是否重新排隊 | true |
spring.rabbitmq.listener.max-concurrency | 消費者的最大數量 | |
spring.rabbitmq.listener.prefetch | 在單個請求中處理的訊息個數,他應該大於等於事務數量 | |
spring.rabbitmq.listener.retry.enabled | 不論是不是重試的釋出 | false |
spring.rabbitmq.listener.retry.initial-interval | 第一次與第二次投遞嘗試的時間間隔 | 1000 |
spring.rabbitmq.listener.retry.max-attempts | 嘗試投遞訊息的最大數量 | 3 |
spring.rabbitmq.listener.retry.max-interval | 兩次嘗試的最大時間間隔 | 10000 |
spring.rabbitmq.listener.retry.multiplier | 上一次嘗試時間間隔的乘數 | 1.0 |
spring.rabbitmq.listener.retry.stateless | 不論重試是有狀態的還是無狀態的 | true |
spring.rabbitmq.listener.transaction-size | 在一個事務中處理的訊息數量。為了獲得最佳效果,該值應設定為小於等於每個請求中處理的訊息個數,即spring.rabbitmq.listener.prefetch的值 | |
spring.rabbitmq.password | 登入到RabbitMQ的密碼 | |
spring.rabbitmq.port | RabbitMQ的埠號 | 5672 |
spring.rabbitmq.publisher-confirms | 開啟Publisher Confirm機制 | false |
spring.rabbitmq.publisher-returns | 開啟publisher Return機制 | false |
spring.rabbitmq.requested-heartbeat | 請求心跳超時時間,單位為秒 | |
spring.rabbitmq.ssl.enabled | 啟用SSL支援 | false |
spring.rabbitmq.ssl.key-store | 儲存Sspring.rabbitmq.listener.simple.acknowledge-modeSL證書的地址 | |
spring.rabbitmq.ssl.key-store-password | 訪問SSL證書的地址使用的密碼 | |
spring.rabbitmq.ssl.trust-store | SL的可信地址 | |
spring.rabbitmq.ssl.trust-store-password | 訪問SSL的可信地址的密碼 | |
spring.rabbitmq.ssl.algorithm | SSL演算法,預設使用Rabbit的客戶端演算法庫 | |
spring.rabbitmq.template.mandatory | 啟用強制資訊 | false |
spring.rabbitmq.template.receive-timeout | receive()方法的超時時間 | 0 |
spring.rabbitmq.template.reply-timeout | sendAndReceive()方法的超時時間 | 5000 |
spring.rabbitmq.template.retry.enabled | 設定為true的時候RabbitTemplate能夠實現重試 | false |
spring.rabbitmq.template.retry.initial-interval | 第一次與第二次釋出訊息的時間間隔 | 1000 |
spring.rabbitmq.template.retry.max-attempts | 嘗試釋出訊息的最大數量 | 3 |
spring.rabbitmq.template.retry.max-interval | 嘗試釋出訊息的最大時間間隔 | 10000 |
spring.rabbitmq.template.retry.multiplier | 上一次嘗試時間間隔的乘數 | 1.0 |
spring.rabbitmq.username | 登入到RabbitMQ的使用者名稱 | |
spring.rabbitmq.virtual-host | 連線到RabbitMQ的虛擬主機 |
1.3 開啟 @RabbitListener 註解
-
通過 @EnableRabbit 註解來啟用 @RabbitListener。
-
@RabbitListener 註解指定了目標方法來作為消費訊息的方法,通過註解引數指定所監聽的佇列或者 Binding 。使用 @RabbitListener 可以設定一個自己明確預設值的 RabbitListenerContainerFactory 物件。
@EnableRabbit
@SpringBootApplication
public class SpringbootApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootApplication.class, args);
}
}
二、簡單示例
2.1 配置交換機、佇列、繫結關係
@Configuration
public class RabbitMQConfig {
@Bean
public TopicExchange orderExchange(){
return new TopicExchange("simpleExchange", true, false);
}
@Bean
public Queue orderQueue(){
return new Queue("simpleQueue", true);
}
@Bean
public Binding beanBinding(TopicExchange orderExchange, Queue orderQueue) {
return BindingBuilder
// 建立佇列
.bind(orderQueue)
// 建立交換機
.to(orderExchange)
// 指定路由 Key
.with("simple.#");
}
}
2.2 建立消費者物件
@Component
public class Consume {
@RabbitListener(queues = "simpleQueue")
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消費端Payload: " + message.getPayload());
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
2.3 建立生產者物件
@SpringBootTest
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendSimpleMessage() {
String exchange = "simpleExchange";
String routingKey = "simple.message";
rabbitTemplate.convertAndSend(exchange, routingKey, "hello simpleExchange");
}
}
2.4 測試
先啟動應用,然後執行 sendSimpleMessage() 方法,觀察控制檯輸出:
--------------------------------------
消費端Payload: hello simpleExchange
說明 @RabbitListener 生效了。
三、@RabbitListener 高階使用
3.1 在 @RabbitListener 中申明繫結關係
消費者:
public class BindingConsume {
@RabbitListener(
queues = "simpleQueue")
@RabbitListener(
bindings =
@QueueBinding(
exchange = @Exchange(value = "bindingExchange",
type = "topic"),
value = @Queue(value = "bindingQueue",
durable = "true"),
key = "binding.*"
)
)
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消費端Payload: " + message.getPayload());
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
生產者:
@SpringBootTest
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendBindingMessage() {
String exchange = "bindingExchange";
String routingKey = "binding.message";
rabbitTemplate.convertAndSend(exchange, routingKey, "hello bindingExchange");
}
}
先啟動應用,然後執行 sendBindingMessage() 方法,觀察控制檯輸出:
--------------------------------------
消費端Payload: hello bindingExchange
說明 @RabbitListener 生效了。
3.2 自定義訊息引數型別
建立訂單物件
// 記得需要實現序列化介面
public class Order implements Serializable{
private String orderId;
private BigDecimal amount;
public Order() {
}
public Order(String orderId, BigDecimal amount) {
this.orderId = orderId;
this.amount = amount;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public BigDecimal getAmount() {
return amount;
}
public void setAmount(BigDecimal amount) {
this.amount = amount;
}
@Override
public String toString() {
return "Order{" +
"orderId='" + orderId + '\'' +
", amount=" + amount +
'}';
}
}
建立消費者
@Component
public class OrderConsume {
@RabbitListener(
bindings =
@QueueBinding(
value = @Queue(value = "orderQueue",
durable = "true"),
exchange = @Exchange(value = "orderExchange",
type = "topic"),
key = "order.*"
)
)
public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消費端order: " + order.toString());
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手工ACK
channel.basicAck(deliveryTag, false);
}
}
建立生產者
@SpringBootTest
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendOrderMessage() {
String exchange = "orderExchange";
String routingKey = "order.message";
Order newOrder = new Order("10001", BigDecimal.valueOf(300));
rabbitTemplate.convertAndSend(exchange, routingKey, newOrder);
}
}
先啟動應用,然後執行 sendOrderMessage() 方法,觀察控制檯輸出:
--------------------------------------
消費端order: Order{orderId='10001', amount=300}
3.3 @RabbitListener 和@RabbitHandler 的使用
@RabbitListener 除了在方法上面宣告,也可以在類上面申明 。當宣告在類上之後,就需要和@RabbitHandler 搭配使用:
- @RabbitListener 可以標註在類上面,需配合 @RabbitHandler 註解一起使用;
- @RabbitListener 標註在類上面表示當有收到訊息的時候,就交給 @RabbitHandler 的方法處理,具體使用哪個方法處理,根據 MessageConverter 轉換後的引數型別;
- 當找不到對應的引數型別方法時將會丟擲 ListenerExecutionFailedException 異常,可以通過設定 @RabbitHandler 的 isDefault 屬性為 true 為它宣告一個預設處理方法。
建立 Consume:
@Component
@RabbitListener(
bindings =
@QueueBinding(
value = @Queue(value = "handlerQueue",
durable = "true"),
exchange = @Exchange(value = "handlerExchange",
type = "topic"),
key = "handler.*"
)
)
public class HandlerConsume {
@RabbitHandler(isDefault = true)
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消費端預設處理: " + message.getPayload());
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
@RabbitHandler
public void onMessage(@Payload String msg, @Headers Map<String, Object> headers, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消費端String引數: " + msg);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
@RabbitHandler
public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消費端Order引數: " + order.toString());
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
建立生產者:
@SpringBootTest
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendHandlerStringMessage() {
String exchange = "handlerExchange";
String routingKey = "handler.message";
// 傳送字串訊息
rabbitTemplate.convertAndSend(exchange, routingKey, "hello handlerExchange");
}
@Test
public void sendHandlerOrderMessage() {
String exchange = "handlerExchange";
String routingKey = "handler.message";
// 傳送物件訊息
Order newOrder = new Order("10001", BigDecimal.valueOf(300));
rabbitTemplate.convertAndSend(exchange, routingKey, newOrder);
}
@Test
public void sendHandlerArrayMessage() {
String exchange = "handlerExchange";
String routingKey = "handler.message";
// 傳送字元陣列訊息
String[] array = {"123","456","789"};
rabbitTemplate.convertAndSend(exchange, routingKey, array);
}
}
先啟動應用,然後分別執行 sendHandlerStringMessage() 、sendHandlerOrderMessage() 和 sendHandlerArrayMessage 方法,觀察控制檯輸出:
--------------------------------------
消費端String引數: hello handlerExchange
--------------------------------------
消費端Order引數: Order{orderId='10001', amount=300}
--------------------------------------
消費端預設處理: [Ljava.lang.String;@7c655a9a
說明 @RabbitListener 和@RabbitHandler 生效了。