使用 ActiveMQ 實現JMS 非同步呼叫
目錄
簡介
服務之間的同步呼叫,可以使用 HTTP 或 RPC 來完成,但並非所有的呼叫都需要同步,有些場景下,當客戶端呼叫服務端時,並不需要等待服務端做出響應,此時就應該使用非同步呼叫。非同步呼叫的常用方式是基於 MQ (Message Queue) 來實現的。下文會以 ActiveMQ 為例進行講解。
ActiveMQ 是 Java 世界中最為流行的開源訊息中介軟體,它不僅功能強大,而且效能穩定。它可全面支援 JMS(Java 訊息服務)技術規範,為 Java 應用程式提供標準的 JMS API。
此外 ActiveMQ 具備與 Spring 框架整合的能力,它一直都是 Spring 應用程式的訊息中介軟體標配。同樣, Spring Boot 也提供了 ActiveMQ 的開箱即用的外掛,只需要幾項配置,就能接入 ActiveMQ,並輕鬆使用 JMS API 編寫非同步訊息通訊程式。
Active MQ 官網地址如下
http://activemq.apache.org
啟動 ActiveMQ 伺服器
先使用 docker 安裝 ActiveMQ ,目前 ActiveMQ 官方並未提供相應的 Docker 映象,我們選擇使用第三方映象 webcenter/activemq
docker pull webcenter/activemq:5.14.3
接下來執行 ActiveMQ
docker run -d -p 8161:8161 -p 61616:61616 -e ACTIVEMQ_ADMIN_LOGIN=admin -e ACTIVEMQ_ADMIN_PASSWORD=admin --name activemq webcenter/activemq:5.14.3
在啟動 ActiveMQ 容器時,容器對宿主機暴露了兩個埠號:
- 8161: 表示 ActiveMQ 控制檯埠號,可在瀏覽器中通過控制檯來執行 ActiveMQ 的相關操作
- 61616: 表示 ActiveMQ 所監聽的 TCP 埠號,應用程式可通過該埠號與 ActiveMQ 建立 TCP 連線,並完成後續的非同步訊息通訊
此外,在啟動 ActiveMQ 容器時,還提供了兩個環境變數
- ACTIVEMQ_ADMIN_LOGIN: 用於設定控制檯管理員的使用者名稱,預設為 admin
- ACTIVEMQ_ADMIN_PASSWORD: 用於設定控制檯管理員的密碼,預設為 admin
檢視控制檯
webcenter/activemq
映象擁有一個基於 Web 的控制檯,可通過瀏覽器訪問。容器啟動完畢後,可以開啟瀏覽器,並在位址列中輸入 http://localhost:8161
點選 Manage ActiveMQ borker 連結,瀏覽器將彈出一個對話方塊,此時輸入使用者名稱和密碼,認證通過後會進入管理介面
在管理介面中,包括 8 個功能選單
- Home: 基本資訊
- Queues: 管理的佇列
- Topics: 檢視所管理的主題
- Subscribers: 檢視相關主題的訂閱者
- Connections: 檢視客戶端的連線資訊
- Network: 檢視網路相關資訊
- Scheduled: 檢視 ActiveMQ 內部執行的定時任務
- Send: 通過表單方式查看向佇列或主題傳送具體訊息
ActiveMQ 的訊息通道
ActiveMQ 管理了兩類訊息通道,一類是佇列(Queue),另一類叫做主題(Topic)。
Queue
Queue 用於解決訊息的 點對點 通訊問題,也就是說,訊息從生產者(Producer) 發出後,首先進入 ActiveMQ 某個指定的 Queue 中,然後再將訊息傳送給其中一個消費者(Consumer)。
Topic
Topic 用於解決訊息的釋出與訂閱(Publish-subscribe) 通訊問題,也就是說,訊息從 Producer 發出後,首先將其釋出到 ActiveMQ 某個指定的 Topic 上,然後將此訊息分發給每個訂閱者(Subscriber) 。
比較
在具體場合下,靈活使用以上兩種通訊模式來實現 Producer 與 Consumer/Subscriber 間的非同步呼叫,從而解決呼叫方的耦合問題。可見,Queue 能解決呼叫緩衝問題,Topic 能解決訊息廣播問題, Queue 與 Topic 都能解決掉呼叫耦合問題,這些技術都為一個好的軟體架構提供了有效的支撐。
開發生產者和消費者
下面就以 Queue 為例,將 ActiveMQ 與 Spring Boot 進行整合,將 Producer 作為客戶端, Consumer 作為服務端,通過 Queue 實現客戶端與服務端的非同步呼叫
開發服務端(消費者)
首先建立一個名為 acitvemq-hello-server
的 spring boot 專案,如果在 eclipse 中安裝了 Spring Tools ,可以在新建時選擇 New Spring Starter Project
選項。或者新建 Maven 工程。對應的 maven 依賴如下
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.19.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
在 Spring Boot 框架中已經內建了對 ActiveMQ 的支援,我們只需要依賴 spring-boot-starter-mq 就能啟動 ActiveMQ,此時還需要在 application.properties
檔案中新增 ActiveMQ 配置項
spring.activemq.broker-url=tcp://10.104.10.1:61616
spring.activemq.user=admin
spring.activemq.password=admin
接下來建立 HelloServer
的類,封裝服務端相關程式碼
@Component
public class HelloServer {
@JmsListener(destination="hello-queue")
public void receive(String message) {
System.out.println(message);
}
}
使用 @Component
註解,說明它可被 Spring IoC 容器所管理。此時只需要使用 @JmsListener
註解,並將其繫結到 receive()
方法上,就能從 ActiveMQ 中接收響應的訊息。
@JmsListener
註解中需要新增一個 destination 屬性來指定 Queue/Topic
的名稱,該名稱具有唯一性。訊息將以一個 String 型別引數的形式傳入方法體中,也可以接收其他型別的訊息,這取決於客戶端傳送的訊息是哪種型別。Spring JMS 將訊息放入 ActiveMQ 時會進行序列化,當訊息從 ActiveMQ 取出時將進行反序列化,應用程式無需關注這些底層細節,只需要將精力放在業務邏輯上。
最後,編寫一個 Spring Boot 應用程式啟動類來啟動服務端(使用 spring tools 工具會自動生成)
@SpringBootApplication
public class ActivemqHelloServerApplication {
public static void main(String[] args) {
SpringApplication.run(ActivemqHelloServerApplication.class, args);
}
}
當服務端啟動完畢後,將一直監聽 ActiveMQ 的 hello-queue 佇列中即將到來的訊息,訊息由客戶端來發送。
開發客戶端(生產者)
建立一個名為 active-mq-client
的 Maven 專案, pom.xml 檔案內容與服務端相似。application.properties 檔案與服務端相同。
接下來建立一個名為 HelloClient
的類,將其作為客戶端。
@Component
public class HelloClient {
@Autowired
private JmsTemplate jmsTemplate;
public void send(String message) {
jmsTemplate.convertAndSend("hello-queue", message);
}
}
這裡使用了 @Autowired
註解, JmsTemplate 物件注入進來,還編寫了一個 send()
方法,在該方法中呼叫 JmsTemplate 物件的 convertAndSend
來轉換併發送訊息。
最後使用 Spring Boot 應用程式啟動類來啟動客戶端
@SpringBootApplication
public class ActivemqHelloClientApplication {
@Autowired
private HelloClient helloClient;
@PostConstruct
public void init() {
helloClient.send("hello world");
}
public static void main(String[] args) {
SpringApplication.run(ActivemqHelloClientApplication.class, args);
}
}
需要注意的是, init()
方法帶有 @PostConstruct
註解,表示 Spring IoC 容器例項化 ActivemqHelloClientApplication
類後將呼叫該方法。
執行 main() 方法可以啟動客戶端應用程式,並可以在服務端應用程式控制臺中看到 client 傳送的訊息,也可以在 ActiveMQ 控制檯中檢視佇列的當前狀態
Queue 表格中列明的含義如下
- Name 表示佇列名稱,可在應用程式中自動建立,也可在 ActiveMQ 控制檯中手動建立
- Number Of Pending Messages 表示阻塞在佇列中未經消費的訊息條數
- Number Of Consumers 表示正在與 ActiveMQ 建立連線的消費者數量
- Messages Enqueued 表示進入佇列的訊息數量
- Messages Dequeued 表示離開佇列的訊息數量
此外,還有下面幾種操作
- Browser 用於檢視當前佇列中訊息的相關細節
- Active Consumers 用於檢視當前活動消費者的相關資訊
- Active Producers 用於檢視當前活動生產者的相關資訊
- Send To 用於向當前佇列中傳送具體訊息
- Purge 用於清空佇列中的訊息
- Delete 用於刪除當前佇列
參考
- 《架構探險—輕量級微服務架構》