spring boot實戰(番外篇)整合RabbitMQ
前言
最近幾篇文章將圍繞訊息中介軟體RabbitMQ展開,對於RabbitMQ基本概念這裡不闡述,主要講解RabbitMQ的基本用法、Java客戶端API介紹、spring Boot與RabbitMQ整合、
Spring Boot與RabbitMQ整合原始碼分析。
RabbitMQ安裝
在使用訊息中介軟體RabbitMQ之前就是安裝RabbitMQ。
安裝erlang:yum install erlang
下載RabbitMQ安裝包: https://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-generic-unix-3.5.6.tar.gz
解壓安裝包、配置環境變數RABBITMQ_HOME
參考網址:https://www.rabbitmq.com/install-generic-unix.html
windows: https://www.rabbitmq.com/install-windows.html
RabbitMQ配置
1.安裝完成後需要對RabbitMQ進行配置,在etc/rabbitmq目錄下建立兩個檔案:
rabbitmq-env.conf 環境資訊配置
RABBITMQ_NODE_IP_ADDRESS=127.0.0.1
RABBITMQ_NODE_PORT=5672
RABBITMQ_NODENAME=node01
rabbitmq.config 核心配置檔案
[{rabbit, [{loopback_users, []}]}].
該配置表示是的預設使用者guest使用者可以遠端訪問mq(廣域網不能訪問,內網可以訪問)
2.啟動RabbitMQ 執行命令 rabbitmq-server
RabbitMQ 3.5.4. Copyright (C) 2007-2015 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /Users/liaokailin/software/rabbitmq_server-3.5.4/sbin/../var/log/rabbitmq/node01.log
###### ## /Users/liaokailin/software/rabbitmq_server-3.5.4/sbin/../var/log/rabbitmq/node01-sasl.log
##########
Starting broker... completed with 0 plugins.
3. RabbitMQ提供WEB-UI管理控制檯,使用 rabbitmq-plugins enable rabbitmq_management命令啟用,重啟後可以看到
Starting broker... completed with 6 plugins.
表明WEB-UI控制檯啟動成功,訪問:http://localhost:15672/
登陸進入:
通過該控制檯可以方便管理RabbitMQ。
建立Test使用者
RabbitMQ預設使用guest使用者,下面講述如何建立一個test使用者,最快捷的做法使用web管理控制檯
這裡使用命令建立:
rabbitmqctl add_user test test
rabbitmqctl set_user_tags test administrator
tag分為四種"management", "policymaker", "monitoring" "administrator" 詳見 http://www.rabbitmq.com/management.html
RabbitMQ 其他
在實際使用RabbitMQ中還需要涉及到 RabbitMQ的叢集、高可用(採用映象佇列實現)以後有機會再詳細闡述,有興趣可參考https://www.rabbitmq.com/documentation.html
RabbitMQ Java Client
RabbitMQ 客戶端支援語言種類繁多,官方都一一舉例:https://www.rabbitmq.com/getstarted.html
這裡主要自己開發一個小的demo
訊息消費者
操作步驟:
建立連線工廠ConnectionFactory
獲取連線Connection
通過連接獲取通訊通道Channel
宣告交換機Exchange:交換機型別分為四類:
FanoutExchange: 將訊息分發到所有的繫結佇列,無routingkey的概念
HeadersExchange :通過新增屬性key-value匹配
DirectExchange:按照routingkey分發到指定佇列
TopicExchange:多關鍵字匹配
宣告佇列Queue
將佇列和交換機繫結
建立消費者
執行訊息的消費
package org.lkl.mq.rabbitmq.test;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
* 客戶端01
*
* @author liaokailin
* @version $Id: Receive01.java, v 0.1 2015年11月01日 下午3:47:58 liaokailin Exp $
*/
public class Receive01 {
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException,
ConsumerCancelledException, InterruptedException {
ConnectionFactory facotry = new ConnectionFactory();
facotry.setUsername("test");
facotry.setPassword("test");
facotry.setVirtualHost("test");
facotry.setHost("localhost");
Connection conn = facotry.newConnection(); //獲取一個連結
//通過Channel進行通訊
Channel channel = conn.createChannel();
int prefetchCount = 1;
channel.basicQos(prefetchCount); //保證公平分發
boolean durable = true;
//宣告交換機
channel.exchangeDeclare(Send.EXCHANGE_NAME, "direct", durable); //按照routingKey過濾
//宣告佇列
String queueName = channel.queueDeclare("queue-01", true, true, false, null).getQueue();
//將佇列和交換機繫結
String routingKey = "lkl-0";
//佇列可以多次繫結,繫結不同的交換機或者路由key
channel.queueBind(queueName, Send.EXCHANGE_NAME, routingKey);
//建立消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//將消費者和佇列關聯
channel.basicConsume(queueName, false, consumer); // 設定為false表面手動確認訊息消費
//獲取訊息
System.out.println(" Wait message ....");
while (true) {
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
String key = delivery.getEnvelope().getRoutingKey();
System.out.println(" Received '" + key + "':'" + msg + "'");
System.out.println(" Handle message");
TimeUnit.SECONDS.sleep(3); //mock handle message
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //確定該訊息已成功消費
}
}
}
訊息生產者
操作步驟:
建立連線工廠ConnectionFactory
獲取連線Connection
通過連接獲取通訊通道Channel
傳送訊息
package org.lkl.mq.rabbitmq.test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
* 訊息publish
*
* @author liaokailin
* @version $Id: Send.java, v 0.1 2015年10月22日 下午3:48:09 liaokailin Exp $
*/
public class Send {
public final static String EXCHANGE_NAME = "test-exchange";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
/**
* 配置amqp broker 連線資訊
*/
ConnectionFactory facotry = new ConnectionFactory();
facotry.setUsername("test");
facotry.setPassword("test");
facotry.setVirtualHost("test");
facotry.setHost("localhost");
Connection conn = facotry.newConnection(); //獲取一個連結
//通過Channel進行通訊
Channel channel = conn.createChannel();
// channel.exchangeDeclare(Send.EXCHANGE_NAME, "direct", true); //如果消費者已建立,這裡可不宣告
channel.confirmSelect(); //Enables publisher acknowledgements on this channel
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("[handleNack] :" + deliveryTag + "," + multiple);
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("[handleAck] :" + deliveryTag + "," + multiple);
}
});
String message = "lkl-";
//訊息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN
//傳送多條資訊,每條訊息對應routekey都不一致
for (int i = 0; i < 10; i++) {
channel.basicPublish(EXCHANGE_NAME, message + (i % 2), MessageProperties.PERSISTENT_TEXT_PLAIN,
(message + i).getBytes());
System.out.println("[send] msg " + (message + i) + " of routingKey is " + (message + (i % 2)));
}
}
}
在設定訊息被消費的回撥前需顯示呼叫
channel.confirmSelect()
否則回撥函式無法呼叫
先執行消費者,消費者會輪詢是否有訊息的到來,在web控制也可以觀察哦~~,再啟動生產者傳送訊息。
================================
前言
本篇主要講述spring Boot與RabbitMQ的整合,內容非常簡單,純API的呼叫操作。 操作之間需要加入依賴Jar
1
2
3
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId> <br>/dependency>
訊息生產者
不論是建立訊息消費者或生產者都需要ConnectionFactory
ConnectionFactory配置
建立AmqpConfig檔案AmqpConfig.java(後期的配置都在該檔案中)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class AmqpConfig {
public static final String EXCHANGE = "spring-boot-exchange";
public static final String ROUTINGKEY = "spring-boot-routingKey";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); //必須要設定
return connectionFactory;
}
}
這裡需要顯示呼叫
connectionFactory.setPublisherConfirms(true);
才能進行訊息的回撥。
RabbitTemplate
通過使用RabbitTemplate來對開發者提供API操作
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必須是prototype型別
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
這裡設定為原型,具體的原因在後面會講到
在傳送訊息時通過呼叫RabbitTemplate中的如下方法
public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)
exchange:交換機名稱
routingKey:路由關鍵字
object:傳送的訊息內容
correlationData:訊息ID
因此生產者程式碼詳單簡潔
Send.java
@Component
public class Send {
private RabbitTemplate rabbitTemplate;
/**
* 構造方法注入
*/
@Autowired
public Send(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMsg(String content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
}
}
如果需要在生產者需要訊息傳送後的回撥,需要對rabbitTemplate設定ConfirmCallback物件,由於不同的生產者需要對應不同的ConfirmCallback,如果rabbitTemplate設定為單例bean,則所有的rabbitTemplate
實際的ConfirmCallback為最後一次申明的ConfirmCallback。
下面給出完整的生產者程式碼:
package com.lkl.springboot.amqp;
import java.util.UUID;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 訊息生產者
*
* @author liaokailin
* @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $
*/
@Component
public class Send implements RabbitTemplate.ConfirmCallback {
private RabbitTemplate rabbitTemplate;
/**
* 構造方法注入
*/
@Autowired
public Send(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果為單例的話,那回調就是最後設定的內容
}
public void sendMsg(String content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
}
/**
* 回撥
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(" 回撥id:" + correlationData);
if (ack) {
System.out.println("訊息成功消費");
} else {
System.out.println("訊息消費失敗:" + cause);
}
}
}
訊息消費者
消費者負責申明交換機(生產者也可以申明)、佇列、兩者的繫結操作。
交換機
/**
* 針對消費者配置
FanoutExchange: 將訊息分發到所有的繫結佇列,無routingkey的概念
HeadersExchange :通過新增屬性key-value匹配
DirectExchange:按照routingkey分發到指定佇列
TopicExchange:多關鍵字匹配
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE);
}
在Spring Boot中交換機繼承AbstractExchange類
佇列
@Bean
public Queue queue() {
return new Queue("spring-boot-queue", true); //佇列持久
}
繫結
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
}
完成以上工作後,在spring boot中通過訊息監聽容器實現訊息的監聽,在訊息到來時執行回撥操作。
訊息消費
@Bean
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("receive msg : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認訊息成功消費
}
});
return container;
}
下面給出完整的配置檔案:
package com.lkl.springboot.amqp;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import com.rabbitmq.client.Channel;
/**
* Qmqp Rabbitmq
*
* http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/
*
* @author lkl
* @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $
*/
@Configuration
public class AmqpConfig {
public static final String EXCHANGE = "spring-boot-exchange";
public static final String ROUTINGKEY = "spring-boot-routingKey";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); //必須要設定
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必須是prototype型別
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
/**
* 針對消費者配置
* 1. 設定交換機型別
* 2. 將佇列繫結到交換機
*
*
FanoutExchange: 將訊息分發到所有的繫結佇列,無routingkey的概念
HeadersExchange :通過新增屬性key-value匹配
DirectExchange:按照routingkey分發到指定佇列
TopicExchange:多關鍵字匹配
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE);
}
@Bean
public Queue queue() {
return new Queue("spring-boot-queue", true); //佇列持久
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
}
@Bean
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("receive msg : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認訊息成功消費
}
});
return container;
}
}
以上完成 Spring Boot與RabbitMQ的整合
自動配置
在Spring Boot中實現了RabbitMQ的自動配置,在配置檔案中新增如下配置資訊
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.virtualHost=test
後會自動建立ConnectionFactory以及RabbitTemplate對應Bean,為什麼上面我們還需要手動什麼呢?
自動建立的ConnectionFactory無法完成事件的回撥,即沒有設定下面的程式碼
connectionFactory.setPublisherConfirms(true);
具體分析見後續文章的原始碼解讀.
=========================================
前言
本篇開始講述Spring Boot如何整合RabbitMQ(實際上Spring就整合了RabbitMQ)。
RabbitAdmin
在上篇中遺留AmqpAdmin沒有講解,現在來看下該部分程式碼
public AmqpAdmin amqpAdmin(CachingConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
建立RabbitAdmin例項,呼叫構造方法
public RabbitAdmin(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
this.rabbitTemplate = new RabbitTemplate(connectionFactory);
}
建立連線工廠、rabbitTemplate,其中ConnectionFactory採用上一篇中自定義bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPublisherConfirms(true); //必須要設定
return connectionFactory;
}
為CachingConnectionFactory例項,其快取模式為通道快取
private volatile CacheMode cacheMode = CacheMode.CHANNEL;
接下來看下RabbitAdmin類定義:
public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, InitializingBean {
...
}
實現介面AmqpAdmin(定義若干RabbitMQ操作父介面),這裡需要強調的是InitializingBean,實現該介面則會呼叫afterPropertiesSet方法
public void afterPropertiesSet() {
synchronized (this.lifecycleMonitor) {
if (this.running || !this.autoStartup) {
return;
}
if (this.connectionFactory instanceof CachingConnectionFactory &&
((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
return;
}
this.connectionFactory.addConnectionListener(new ConnectionListener() {
// Prevent stack overflow...
private final AtomicBoolean initializing = new AtomicBoolean(false);
@Override
public void onCreate(Connection connection) {
if (!initializing.compareAndSet(false, true)) {
// If we are already initializing, we don't need to do it again...
return;
}
try {
initialize();
}
finally {
initializing.compareAndSet(true, false);
}
}
@Override
public void onClose(Connection connection) {
}
});
this.running = true;
}
}
synchronized (this.lifecycleMonitor)加鎖保證同一時間只有一個執行緒訪問該程式碼,隨後呼叫this.connectionFactory.addConnectionListener新增連線監聽,各連線工廠關係:
實際呼叫為CachingConnectionFactory
public void addConnectionListener(ConnectionListener listener) {
super.addConnectionListener(listener);
// If the connection is already alive we assume that the new listener wants to be notified
if (this.connection != null) {
listener.onCreate(this.connection);
}
}
此時connection為null,無法執行到listener.onCreate(this.connection); 往CompositeConnectionListener connectionListener中新增監聽資訊,最終保證在集合中
private List<ConnectionListener> delegates = new CopyOnWriteArrayList<ConnectionListener>();
這裡新增的監聽程式碼執行,在後面呼叫時再來講解。
至此~~ RabbitAdmin建立完成。
Exchange
接下來繼續來看AmqpConfig.java中的程式碼
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE);
}
以上程式碼建立一個交換機,交換機型別為direct
在申明交換機時需要指定交換機名稱,預設建立可持久交換機
Queue
public Queue queue() {
return new Queue("spring-boot-queue", true); //佇列持久
}
預設建立可持久佇列
Binding
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
}
BindingBuilder.bind(queue()) 實現為:
public static DestinationConfigurer bind(Queue queue) {
return new DestinationConfigurer(queue.getName(), DestinationType.QUEUE);
}
DestinationConfigurer通過name、type區分不同配置資訊,其to()方法為過載方法,傳遞引數為四種交換機,分別返回XxxExchangeRoutingKeyConfigurer,其中with方法返回Bingding例項,因此在Binding資訊中儲存了
佇列、交換機、路由key等相關資訊
public class Binding extends AbstractDeclarable {
public static enum DestinationType {
QUEUE, EXCHANGE;
}
private final String destination;
private final String exchange;
private final String routingKey;
private final Map<String, Object> arguments;
private final DestinationType destinationType;
...
}
以上資訊理解都非常簡單,下面來看比較複雜點的SimpleMessageListenerContainer
SimpleMessageListenerContainer
@Bean
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("receive msg : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認訊息成功消費
}
});
return container;
}
檢視其實現的介面,注意SmartLifecycle
接下來設定佇列資訊,在AbstractMessageListenerContainer
private volatile List<String> queueNames = new CopyOnWriteArrayList<String>();
新增佇列資訊
AbstractMessageListenerContainer#exposeListenerChannel設定為true
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
設定併發消費者數量,預設情況為1
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
設定消費者成功消費訊息後確認模式,分為兩種
自動模式,預設模式,在RabbitMQ Broker訊息傳送到消費者後自動刪除
手動模式,消費者客戶端顯示編碼確認訊息消費完成,Broker給生產者傳送回撥,訊息刪除
接下來設定消費者端訊息監聽,為privatevolatile Object messageListener 賦值
到這裡訊息監聽容器也建立完成了,但令人納悶的時,消費者如何去消費訊息呢?從這裡完全看不出來。那麼接下來看下SmartLifecycle介面
SmartLifecycle
熟悉Spring都應該知道該介面,其定義為:
public interface SmartLifecycle extends Lifecycle, Phased {
boolean isAutoStartup();
void stop(Runnable callback);
}
其中的isAutoStartup設定為true時,會自動呼叫Lifecycle介面中的start方法,既然我們為原始碼分析,也簡單看下這個聰明的宣告週期介面是如何實現它的聰明方法的
在spring boot實戰(第十篇)Spring boot Bean載入原始碼分析中講到執行Bean載入時,呼叫AbstractApplicationContext#refresh(),其中存在一個方法呼叫finishRefresh()
[html] view plain copy
protected void finishRefresh() {
// Initialize lifecycle processor for this context.
initLifecycleProcessor();
// Propagate refresh to lifecycle processor first.
getLifecycleProcessor().onRefresh();
// Publish the final event.
publishEvent(new ContextRefreshedEvent(this));
// Participate in LiveBeansView MBean, if active.
LiveBeansView.registerApplicationContext(this);
}
其中initLifecycleProcessor初始化生命週期處理器,
[html] view plain copy
protected void initLifecycleProcessor() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
if (beanFactory.containsLocalBean(LIFECYCLE_PROCESSOR_BEAN_NAME)) {
this.lifecycleProcessor =
beanFactory.getBean(LIFECYCLE_PROCESSOR_BEAN_NAME, LifecycleProcessor.class);
if (logger.isDebugEnabled()) {
logger.debug("Using LifecycleProcessor [" + this.lifecycleProcessor + "]");
}
}
else {
DefaultLifecycleProcessor defaultProcessor = new DefaultLifecycleProcessor();
defaultProcessor.setBeanFactory(beanFactory);
this.lifecycleProcessor = defaultProcessor;
beanFactory.registerSingleton(LIFECYCLE_PROCESSOR_BEAN_NAME, this.lifecycleProcessor);
if (logger.isDebugEnabled()) {
logger.debug("Unable to locate LifecycleProcessor with name '" +
LIFECYCLE_PROCESSOR_BEAN_NAME +
"': using default [" + this.lifecycleProcessor + "]");
}
}
}
註冊DefaultLifecycleProcessor對應bean
getLifecycleProcessor().onRefresh()呼叫DefaultLifecycleProcessor中方法onRefresh,呼叫startBeans(true)
[html] view plain copy
private void startBeans(boolean autoStartupOnly) {
Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
Map<Integer, LifecycleGroup> phases = new HashMap<Integer, LifecycleGroup>();
for (Map.Entry<String, ? extends Lifecycle> entry : lifecycleBeans.entrySet()) {
Lifecycle bean = entry.getValue();
if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
int phase = getPhase(bean);
LifecycleGroup group = phases.get(phase);
if (group == null) {
group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
phases.put(phase, group);
}
group.add(entry.getKey(), bean);
}
}
if (phases.size() > 0) {
List<Integer> keys = new ArrayList<Integer>(phases.keySet());
Collections.sort(keys);
for (Integer key : keys) {
phases.get(key).start();
}
}
}
其中
Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
獲取所有實現Lifecycle介面bean,執行bean instanceof SmartLifecycle && ((SmartLifecycle)bean).isAutoStartup()判斷,如果bean同時也為Phased例項,則加入到LifecycleGroup中,隨後phases.get(key).start()呼叫start方法
接下來要做的事情就很明顯:要了解消費者具體如何實現,檢視SimpleMessageListenerContainer中的start是如何實現的。
至此~~整合RabbitMQ原始碼分析準備工作完成,下一篇中正式解讀消費者的實現。
==============================
踩坑記錄
近日在用spring boot架構一個微服務框架,服務發現與治理、釋出REST介面各種輕鬆愜意。但是服務當設計MQ入口時,就發現遇到無數地雷,現在整理成下文,供各路大俠圍觀與嘲笑。
版本
當前使用的spring-boot-starter-amqp版本為2016.5釋出的1.3.5.RELEASE
也許若干年後,你們版本都不會有這些問題了。:(
RabbitMQ
當需要用到MQ的時候,我的第一反映就是使用RabbitMQ,貓了一眼spring boot的官方說明,上面說spring boot為rabbit準備了spring-boot-starter-amqp,並且為RabbitTemplate和RabbitMQ提供了自動配置選項。暗自竊喜~~
瞅瞅[官方文件]http://docs.spring.io/spring-boot/docs/current-SNAPSHOT/reference/htmlsingle/#boot-features-rabbitmq和例子,SO EASY,再看一眼GITHUB上的官方例了,也有例子。
心情愉悅的照著例子,開幹~~。
踩坑
十五分鐘後的程式碼類似這樣:
@Service
@RabbitListener(queues = "merchant")
public class MQReceiver {
protected Logger logger = Logger.getLogger(MQReceiver.class
.getName());
@RabbitHandler
public void process(@Payload UpdateMerchant request) {
UpdateMerchantResponse response = new UpdateMerchantResponse();
logger.info(request.getMerchantId() + "->" + response.getReturnCode());
}
}
消費資訊後,應該記錄一條日誌。
結果得到只有org.springframework.amqp.AmqpException: No method found for class [B 這個異常,並且還無限迴圈丟擲這個異常。。。
記得剛才官方文件好像說了異常什麼的,轉身去貓一眼,果然有:
If retries are not enabled and the listener throws an exception, by default the delivery will be retried indefinitely. You can modify this behavior in two ways; set the defaultRequeueRejected
property to false
and zero re-deliveries will be attempted; or, throw an AmqpRejectAndDontRequeueException
to signal the message should be rejected. This is the mechanism used when retries are enabled and the maximum delivery attempts are reached.
知道了為啥會無限重試了,下面來看看為啥會丟擲這個異常,google搜一下,貌似還有一個倒黴鬼遇到了這個問題。
進去看完問題和大神的解答,豁然開朗。
There are two conversions in the @RabbitListener pipeline.
The first converts from a Spring AMQP Message to a spring-messaging Message.
There is currently no way to change the first converter from SimpleMessageConverter which handles String, Serializable and passes everything else as byte[].
The second converter converts the message payload to the method parameter type (if necessary).
With method-level @RabbitListeners there is a tight binding between the handler and the method.
With class-level @RabbitListener s, the message payload from the first conversion is used to select which method to invoke. Only then, is the argument conversion attempted.
This mechanism works fine with Java Serializable objects since the payload has already been converted before the method is selected.
However, with JSON, the first conversion returns a byte[] and hence we find no matching @RabbitHandler.
We need a mechanism such that the first converter is settable so that the payload is converted early enough in the pipeline to select the appropriate handler method.
A ContentTypeDelegatingMessageConverter is probably most appropriate.
And, as stated in AMQP-574, we need to clearly document the conversion needs for a @RabbitListener, especially when using JSON or a custom conversion.
得嘞,官方示例果然是坑,試試大神的解決方案,手動新增下轉換。
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
然後在生產和消費資訊的地方使用他們:
@RabbitListener(queues = "merchant", containerFactory="rabbitListenerContainerFactory")
public void process(@Payload UpdateMerchant request) {
UpdateMerchantResponse response = new UpdateMerchantResponse();
logger.info(request.getMerchantId() + "->" + response.getReturnCode());
}
再來一次,果然可以了
c.l.s.m.service.MQReceiver : 00000001->null
---------------------
作者:子非魚yy
來源:CSDN
原文:https://blog.csdn.net/ztx114/article/details/78328048
版權宣告:本文為博主原創文章,轉載請附上博文連結!