RabbitMQ 安裝配置和 Spring 整合
本文從安裝和配置 RabbitMQ 開始,準備好環境後,直接在 Spring 中整合,並且針對 Spring 中的常見用法提供了示例和講解。
安裝
一般開發環境可能用的都是 Windows,生產環境 Linux 用的比較多,這裡針對 Windows 和 Ubuntu 的安裝說明簡單提煉。其他環境可以直接參考官方文件:https://www.rabbitmq.com/download.html
Windows 安裝
Windows 上安裝很容易,先安裝 Erlang/OTP 環境(注意和 RabbitMQ 版本匹配),再安裝 RabbitMQ 即可。
下載地址:
- 版本依賴:
- Erlang/OTP: http://www.erlang.org/downloads
- RabbitMQ: https://www.rabbitmq.com/install-windows.html
Ubuntu 安裝
1 為了使用儲存庫方式安裝最新版本,需要將 RabbitMQ 簽名祕鑰新增到 apt-key
中,從下面兩種方式選擇一種方式執行:
sudo apt-key adv --keyserver "hkps.pool.sks-keyservers.net" --recv-keys "0x6B73A36E6026DFCA"
或者
wget -O - "https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc" | sudo apt-key add -
第二種方式無需金鑰伺服器即可下載和匯入金鑰。
我使用的第一種。
2 然後在 packagecloud 有段指令碼(自動根據伺服器版本選擇對應的安裝源),當前(2018-11-30)的內容如下:
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash
3 執行該指令碼後,繼續然後執行下面的命令:
sudo apt-get update
更新後,可以通過下面命令檢視當前的 rabbitmq-server 的可用版本:
apt-cache madison rabbitmq-server
我這裡的結果(2018-12-01)顯示如下:
rabbitmq-server | 3.7.9-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server | 3.7.8-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server | 3.7.7-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server | 3.7.6-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server | 3.7.5-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server | 3.6.16-2 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server | 3.6.16-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server | 3.6.15-1 | https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu bionic/main amd64 Packages
rabbitmq-server | 3.6.10-1 | http://archive.ubuntu.com/ubuntu bionic/main amd64 Packages
4 執行下面的命令安裝 rabbitmq-server
sudo apt-get install rabbitmq-server
此時安裝的應該是最新的版本。
可以通過 sudo apt-get install rabbitmq-server=3.7.9-1 安裝指定版本。
配置
接下來主要是在 Ubuntu 環境(Windows 環境類似)進行配置。由於沒有桌面環境,因此先通過命令建立可以外網訪問 rabbitmq 的使用者,然後啟用 management 在通過網頁進行管理。
新增使用者 root,密碼 root。(根據自己需要設定)
sudo rabbitmqctl add_user root root
給 root 新增管理許可權。
sudo rabbitmqctl set_user_tags root administrator
給 root 新增預設虛擬主機的所有許可權。
sudo rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
Windows 中的操作過程
D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.9\sbin>rabbitmqctl.bat add_user root root Adding user "root" ... D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.9\sbin>rabbitmqctl.bat set_user_tags root administrator Setting tags for user "root" to [administrator] ... D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.9\sbin>rabbitmqctl.bat set_permissions -p / root ".*" ".*" ".*" Setting permissions for user "root" in vhost "/" ...
啟用 rabbitmq_management
sudo rabbitmq-plugins enable rabbitmq_management
啟用
rabbitmq_management
後不需要重啟服務
此後可以直接訪問 rabbitmq 的 http://RabbitMQ服務IP:15672 通過 WEB 進行管理。
備忘錄(暫時不用關注這裡,測試叢集時可用)
單機啟動多個帶有 rabbitmq_management 節點時的配置RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbitl RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port , 156721}]" rabbitmq-server -detached
參考 RabbitMQ實戰指南 7.1.5 單機多節點配置
準備好 RabbitMQ 環境後,下面直接和 Spring 整合。
初學者建議先通過官方示例瞭解 RabbitMQ 的基本概念和用法:https://www.rabbitmq.com/getstarted.html
Spring 整合
下面先是 Spring 整合的配置,然後是專案中具體的用法。
下面示例所有連結都可以直接開啟展示完整內容。
完整示例地址:https://github.com/abel533/spring-rabbitmq-demo
配置
1 新增相關依賴
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.11.RELEASE</version>
</dependency>
<!-- spring-rabbit 依賴 spring-amqp,下面這個依賴可以不顯示引入 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>1.7.11.RELEASE</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.11.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
2 配置檔案
將 spring-rabbit 配置單獨放在一個檔案中,需要的時候可以直接在 Spring 中 <import>
。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task.xsd">
<!--啟用註解監聽訊息-->
<rabbit:annotation-driven/>
<!--連線工廠配置-->
<rabbit:connection-factory id="rabbitConnectionFactory"
thread-factory="amqpThreadFactory"
virtual-host="${rabbitmq.virtual-host:/}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
channel-cache-size="${rabbitmq.channel-cache-size:30}"
addresses="${rabbitmq.addresses}"/>
<bean id="amqpThreadFactory" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-"/>
</bean>
<!--訊息模板-->
<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"
message-converter="amqpMessageConverter"/>
<!--訊息轉換,生產者和消費者都需要 -->
<bean id="amqpMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
<!--amqp管理-->
<rabbit:admin id="amqpAdmin" connection-factory="rabbitConnectionFactory"/>
<!--訊息監聽容器,配合註解監聽訊息-->
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<!--併發消費者數量-->
<property name="concurrentConsumers" value="${rabbitmq.concurrentConsumers:3}"/>
<!--最大數量-->
<property name="maxConcurrentConsumers" value="${rabbitmq.maxConcurrentConsumers:10}"/>
<!--訊息轉換-->
<property name="messageConverter" ref="amqpMessageConverter"/>
<!--任務執行緒池-->
<property name="taskExecutor">
<task:executor id="amqpTaskExecutor" pool-size="${rabbitmq.task-executor.pool-size:100}"/>
</property>
<!--手動確認-->
<property name="acknowledgeMode" value="${rabbitmq.acknowledgeMode:MANUAL}"/>
</bean>
</beans>
3 Spring 配置檔案中需要提供的配置
# rabbitmq 訊息配置
rabbitmq.addresses=localhost:5672
rabbitmq.virtual-host=/
rabbitmq.username=root
rabbitmq.password=root
rabbitmq.channel-cache-size=50
rabbitmq.concurrentConsumers=3
rabbitmq.maxConcurrentConsumers=10
# 確認方式 MANUAL 手動,AUTO 自動,NONE 自動確認
rabbitmq.acknowledgeMode=MANUAL
# 執行緒池數量 = 併發數 * 監聽數
rabbitmq.task-executor.pool-size=100
下面是和 Spring 整合後的用法。
測試中,增加了 spring.xml 配置檔案,內容如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!--載入屬性配置檔案-->
<context:property-placeholder location="classpath:META-INF/spring/application.properties"/>
<!--掃描包-->
<context:component-scan base-package="rabbitmq"/>
<!--Producter 中的任務排程使用-->
<task:scheduler id="taskScheduler"/>
<task:annotation-driven scheduler="taskScheduler"/>
<!--引入 spring-rabbitmq 配置-->
<import resource="classpath*:META-INF/spring/spring-rabbitmq.xml"/>
</beans>
生產者
示例程式碼 如下:
@Component
public class Producter {
public static final Logger logger = LoggerFactory.getLogger(Producter.class);
@Autowired
private AmqpTemplate template;
@Autowired
private AmqpAdmin admin;
@PostConstruct
protected void init() {
//定義交換機
Exchange exchange = ExchangeBuilder.topicExchange("logger").durable(true).build();
admin.declareExchange(exchange);
//還可以定義佇列和繫結
}
final Random random = new Random();
final String[] keys = new String[]{"logger.error", "logger.warn", "logger.info"};
AtomicInteger count = new AtomicInteger();
@Scheduled(fixedDelay = 1000)
protected void product() {
String key = keys[random.nextInt(3)];
int i = count.getAndIncrement();
String message = key + " > " + i + " " + new Date();
User obj = new User(message, i);
template.convertAndSend("logger", key, obj);
logger.info("[Send] " + obj);
}
}
- 在程式碼中直接注入
AmqpTemplate
,用於傳送或接收訊息。 - 根據需要注入
AmqpAdmin
,可以用於建立交換機、佇列和繫結。
上面程式碼中,在 init
初始化中定義了一個交換機。通過 product
定時任務,每隔 1000 毫秒執行一次,呼叫 template.convertAndSend("logger", key, obj);
傳送訊息,傳送的物件會根據前面 spring-rabbit 配置檔案中的訊息轉換器轉換為 JSON 資料進行傳送。
生產者的邏輯可以根據業務需要進行定製。
消費者
消費者有多種用法,這裡使用最方便的註解用法。
在 Consumer 程式碼中,有 3 個例子,這裡拿第一個進行講解:
/**
* 接收物件的例子
*
* 該方法還可以直接注入 org.springframework.amqp.core.Message 物件
*
* @param data
* @param deliveryTag
* @param channel
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "logger.all", durable = "true"),
exchange = @Exchange(value = "logger",
durable = "true",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC),
key = "logger.#"
))
public void all(User data, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag, Channel channel) {
try {
//測試用,隨機確認和拒絕(並返回佇列)
if(Math.random() > 0.5d){
logger.info("[reject] deliveryTag:" + deliveryTag + ", message: " + data);
channel.basicReject(deliveryTag, true);
} else {
logger.info("[ack ] deliveryTag:" + deliveryTag + ", message: " + data);
channel.basicAck(deliveryTag, false);
}
} catch (IOException e) {
e.printStackTrace();
}
}
註解
消費者監聽的主要註解就是 @RabbitListener
,上面例子是一個比較複雜的用法,下面從簡單開始說起。
最簡單的情況下,註解用法如下:
@RabbitListener(queues = "myQueue")
public void processOrder(String data) {
...
}
這種情況下,要求 myQueue
佇列已經存在,這樣就能直接監聽該佇列。除此之外這裡接收的引數要求是字串型別,和消費者傳送的訊息型別需要一致。
再稍微簡單點的情況下,用法如下:
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "auto.exch"),
key = "invoiceRoutingKey")
)
public void processInvoice(String data) {
...
}
實際上這裡已經有些複雜了,這個例子的特點就是,不需要事先存在交換機、佇列和繫結。Spring 在啟動的時候會根據這裡的註解去建立這三者(RabbitMQ 規則是如果佇列、交換機已經存在,在引數相同的情況下會直接複用,不會建立新的,如果引數不同會報錯)。這裡的佇列只用了 @Queue
,因此會建立一個匿名獨佔自動刪除的佇列。交換機的名字指定了 auto.exch
,佇列和交換機通過 invoiceRoutingKey
進行繫結。
現在再來看本例的用法:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "logger.all", durable = "true"),
exchange = @Exchange(value = "logger",
durable = "true",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC),
key = "logger.#"
))
這裡建立了一個指定名稱的佇列,並且配置了持久化。還建立了一個支援持久化的交換機,型別為 TOPIC
,並且忽略交換機的宣告異常(如果已經存在並且屬性不同時,忽略此異常)。通過 logger.#
進行匹配,在主題交換機中,有兩個特殊的字元 *
和 #
,分別匹配一個逗號隔開的單詞和任意(可0)單詞。因此這裡能匹配 logger.info
, logger.xxx.debug
等路由。
除了上面這些常見用法外,還有一個特殊的情況,可以根據接收型別自動匹配的用法,如下:
@RabbitListener(id="multi", queues = "someQueue")
public class MultiListenerBean {
@RabbitHandler
@SendTo("my.reply.queue")
public String bar(Bar bar) {
...
}
@RabbitHandler
public String baz(Baz baz) {
...
}
@RabbitHandler
public String qux(@Header("amqp_receivedRoutingKey") String rk, @Payload Qux qux) {
...
}
}
在類上使用了 @RabbitListener
註解,在方法上使用了 @RabbitHandler
註解。在監聽 someQueue
佇列時,會根據訊息的實際型別,呼叫匹配的方法(Bar
, Baz
和 Qux
)。
特別注意:只有上面這種用法下才會根據型別進行匹配,直接在方法上使用
@RabbitListener
註解時不會自動匹配。
下面來看看這個引數需要注意的地方。
引數
在我們配置的 JSON 轉換中,除了轉換的 JSON 串之外,在訊息中還記錄了型別的資訊。如下圖所示:
可以看到在訊息屬性頭中,通過 __TypeId__
記錄了訊息物件的實際型別,因此在 Spring 中的序列化和反序列化中能夠根據這裡的型別進行轉換,當接收型別和這裡指定的型別不一致時會報錯(只有前面 @RabbitHandler
用法中會去匹配正確的方法,無法匹配時報錯)。
Spring AMQP 中支援以下幾類引數:
- 訊息物件(payload),如果引數型別不能明確匹配時,需要通過
@Payload
指定訊息體。 com.rabbitmq.client.Channel
,訊息通道,可以呼叫 AMQP 的基本方法,常用於 ack 和 reject。@Header
註解的引數,從訊息頭提取指定的資訊。org.springframework.amqp.core.Message
訊息的原始物件。org.springframework.messaging.Message<T>
訊息介面,通過泛型指定訊息體型別,可以在 1 的基礎上額外獲取訊息頭資訊。
ack 和 reject
在本例中,由於要手動 ACK 或 REJECT,所以在訊息體之外還注入了 @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag
和 Channel
。
在業務邏輯執行完成後或者發生異常時,根據具體的情況來選擇執行。
如果業務順利執行完成,我們可以直接通過 channel.basicAck(deliveryTag, false);
確認消費,此後訊息佇列會刪除這條已消費的訊息。
如果業務中出現了異常,需要具體分析,如果只是網路或可以重試的問題,我們可以通過 channel.basicReject(deliveryTag, true);
將訊息返還給訊息佇列。如果出現的是問題是業務邏輯或者就算重複執行仍然有問題的情況,可能就需要通過 channel.basicReject(deliveryTag, false);
刪除該訊息(存在死信佇列的情況會接收該訊息,可以進行後續處理)。
總結
學會使用 RabbitMQ 是一件很容易的事情,但是用好用對是很不容易的事。不同常見和業務都需要考慮使用什麼型別的交換機,使用什麼樣的佇列,每個佇列分配多少個併發,這些都很重要。
想要真正用好訊息佇列,還需要學習很多知識,你可以通過下面的參考資料瞭解更多。
參考資料
在我學 RabbitMQ 的過程中,下面這些資料是特別有用的,都是官方提供的專案文件,必要的時候可以多看幾遍。
- https://www.rabbitmq.com
- https://www.rabbitmq.com/man/rabbitmqctl.8.html
- https://docs.spring.io/spring-amqp/docs/1.7.11.RELEASE/reference/html/index.html
除此之外,我還參考了下面兩本書:
- RabbitMQ實戰:高效部署分散式訊息佇列
- RabbitMQ實戰指南
第一本書更多的像文件,第二本書有更多作者的心得和技巧。