使用 RabbitMQ 實現非同步呼叫
目錄
引言
除了上篇文章所講的 ActiveMQ,還有一種流行的開源訊息中介軟體叫 RabbitMQ。和 ActiveMQ 相比,它具有更高的效能。
RabbitMQ 不再基於 JMS 規範,也沒有選擇 Java 作為底層實現語言。 它基於另一種訊息通訊協議,名為 AMQP,並採用 Erlang 語言作為技術實現。 RabbitMQ 提供了眾多語言客戶端,能夠與 Spring 框架整合,Spring Boot 也提供了對 RabbitMQ 的支援。
RabbitMQ 官網: http://www.rabbitmq.com
啟動 RabbitMQ 伺服器
執行 rabbitmq 容器
RabbitMQ 官方已經提供了自己的 Docker 容器,先下載 rabbitmq:3-management 映象來啟動 RabbitMQ 容器, 之所以選擇這個映象是因為它擁有一個 web 控制檯,可以通過瀏覽器來訪問。
docker pull rabbitmq:3-management
RabbitMQ 除了控制檯,還提供了 HTTP API 方式,可方便應用程式使用。
下面使用如下 Docker 命令啟動 RabbitMQ
docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management
在啟動 RabbitMQ 容器時,它對宿主機暴露了兩個埠號
- 15672: 表示RabbitMQ 控制檯埠號,可在瀏覽器中通過控制檯來執行 RabbitMQ 的相關操作
- 5672 表示 RabbitMQ 監聽的TCP 埠號,應用程式可以通過該埠號與 RabbitMQ 建立 TCP 連線,並完成後續的非同步訊息通訊
此外,啟動時還有兩個環境變數
- RABBITMQ_DEFAULT_USER : 設定控制檯預設使用者名稱, 預設為 guest
- RABBITMQ_DEFAULT_PASS: 設定控制檯預設密碼,預設為 guest
RabbitMQ 控制檯
RabbitMQ 容器啟動完畢後,開啟瀏覽器,並在位址列中輸入 http://localhost:15672/
在上面管理介面中,包含 6 個功能選單
- Overview: 用於檢視 RabbitMQ 基本資訊
- Connections: 用於檢視 RabbitMQ 客戶端的連線資訊
- Channels: 用於檢視 RabbitMQ 的通道
- Exchanges:用於檢視 RabbitMQ 的交換機
- Queues: 用於檢視 RabbitMQ 的佇列
- Admin: 用於管理 RabbitMQ 的使用者,虛擬主機,策略等資料
Exchange 和 Queue
RabbitMQ 只有 Queue, 沒有 Topic,因為可通過 Exchange 與 Queue 的組合來實現 Topic 所具備的功能。RabbitMQ 的訊息模型如下圖所示
在 Exchange 和 Queue 間有一個 Binding 關係,當訊息從 Producer 傳送到 Exchange 中時,會根據 Binding 來路由訊息的去向。
- 如果 Binding 各不相同,那麼該訊息將路由到其中一個 Queue 中,隨後將被一個 Consumer 所消費,此時實現了 "點對點"的訊息通訊模型。
- 如果 Binding 完全相同,那麼該訊息就會路由到每個 Queue 中,隨後將會被每個 Consumer 消費,這樣就實現了 “釋出與訂閱” 的訊息通訊模型
因此可將 Binding 理解為 Exchange 到 Queue 的路由規則,這些規則可通過 RabbitMQ 所提供的客戶端 API 來控制,也可通過 RabbitMQ 提供的控制檯來管理。
RabbitMQ 提供了一個預設的 Exchange(AMQP default),在控制檯的 Exchange 選單中就可以看到它,簡單情況下,只需要使用預設的 Exchange 即可,當需要提供釋出與訂閱功能時才會使用自定義的 Exchange。
開發服務端和客戶端
下面我們就將 Spring Boot 與 RabbitMQ 進行整合,先開發一個服務端作為訊息的消費者,再開發一個客戶端作為訊息的生產者,隨後執行客戶端,並檢視服務端中接收到的訊息。
開發服務端
建立一個名為 rabbitmq-hello-server 的 maven 專案或者 Spring Starter Project, 在 pom.xml 檔案中新增下面 Maven 依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.19.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
Spring Boot 框架中已經添加了對 RabbitMQ 的支援,只需要依賴 spring-boot-starter-amqp
就可以啟動 RabbitMQ,此時還需要在 application.properties
配置檔案中新增 RabbitMQ 的相關配置項
spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
接下來建立 HelloServer 類,封裝服務端程式碼
package demo.msa.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class HelloServer {
@RabbitListener(queues = "hello-queue")
public void receive(String message) {
System.out.println(message);
}
}
只需要在 receive()
方法上定義 @RabbitListener
,並且設定 queues
引數來指定消費者需要監聽的的佇列名稱。
最後,編寫一個 Spring Boot 應用程式啟動類來啟動伺服器
package demo.msa.rabbitmq;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class RabbitmqHelloServerApplication {
@Bean
public Queue helloQueue() {
return new Queue("hello-queue");
}
public static void main(String[] args) {
SpringApplication.run(RabbitmqHelloServerApplication.class, args);
}
}
在 RabbitMQ 中,必須通過程式來顯式建立佇列。服務端啟動完畢後,將持續監聽 RabbitMQ 的 hello-queue 佇列中即將到來的訊息,該訊息由客戶端來發送。
開發客戶端
建立一個名為 rabbitmq-hello-client 的 maven 專案或者 Spring Starter Project, pom 中的依賴與服務端一致。客戶端的 application.properties 檔案與服務端一致。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.19.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
接下來建立一個名為 HelloClient 的類,將其作為客戶端
package demo.msa.rabbitmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class HelloClient {
@Autowired
private RabbitTemplate rabbitmqTemplate;
public void send(String message) {
rabbitmqTemplate.convertAndSend("hello-queue", message);
}
}
最後編寫 Spring Boot 應用程式啟動類來啟動客戶端
package demo.msa.rabbitmq;
import javax.annotation.PostConstruct;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class RabbitmqHelloClientApplication {
@Autowired
private HelloClient helloClient;
@Bean
public Queue helloQueue() {
return new Queue("hello-queue");
}
@PostConstruct
public void init() {
helloClient.send("hello world!");
}
public static void main(String[] args) {
SpringApplication.run(RabbitmqHelloClientApplication.class, args).close();
}
}
與服務端一樣,此處使用 @Bean
註解的 helloQueue()
方法建立一個名為 hello-queue
的佇列,這樣可以保證當客戶端在服務端之前啟動時,也能建立所需的佇列。而且 RabbitMQ 可以確保不會建立同名的佇列,因此可分別在服務端與客戶端建立同名的佇列。
執行 main 方法可以啟動客戶端應用程式,此時將在服務端看到客戶端傳送的訊息,也可以在 RabbitMQ 控制檯中看到訊息隊列當前的狀態。
Java Bean 型別傳輸
上面傳送和接收的訊息只是 String 型別,如果傳送的訊息是一個普通的 Java Bean 型別,應該如何呼叫呢?
Java Bean 型別則必須實現 Serializable
序列化接口才能正常呼叫,這是因為 RabbitMQ 所傳送的訊息是 byte[]
型別,當客戶端傳送訊息需要進行序列化(也就是講 Java 型別轉換為 byte[] 型別),當服務端接收訊息前需要先反序列化,因此傳送和接收的訊息物件必須實現 JDK 的序列化介面。
除了這種序列化方式外,我們也可以使用 Jackson 來實現,而且 RabbitMQ 已經為我們提供了 jackson 序列化的方式,這種方式更加高效。所需要做的是定義一個 Jackson2JsonMessageConverter
的 Spring Bean。
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
結語
RabbitMQ 的效能非常高效和穩定,也能非常方便的與 Spring Boot 應用程式整合,還擁有非常豐富的官方文件和控制檯,因此選擇 RabbitMQ 作為服務之間的非同步訊息呼叫平臺,將成為整個微服務架構中的 "訊息中心"。
參考
- 《架構探險—輕量級微服務架構》