1. 程式人生 > >使用 RabbitMQ 實現非同步呼叫

使用 RabbitMQ 實現非同步呼叫

目錄

引言

除了上篇文章所講的 ActiveMQ,還有一種流行的開源訊息中介軟體叫 RabbitMQ。和 ActiveMQ 相比,它具有更高的效能。

RabbitMQ 不再基於 JMS 規範,也沒有選擇 Java 作為底層實現語言。 它基於另一種訊息通訊協議,名為 AMQP,並採用 Erlang 語言作為技術實現。 RabbitMQ 提供了眾多語言客戶端,能夠與 Spring 框架整合,Spring Boot 也提供了對 RabbitMQ 的支援。

RabbitMQ 官網: http://www.rabbitmq.com

啟動 RabbitMQ 伺服器

執行 rabbitmq 容器

RabbitMQ 官方已經提供了自己的 Docker 容器,先下載 rabbitmq:3-management 映象來啟動 RabbitMQ 容器, 之所以選擇這個映象是因為它擁有一個 web 控制檯,可以通過瀏覽器來訪問。

docker pull rabbitmq:3-management

RabbitMQ 除了控制檯,還提供了 HTTP API 方式,可方便應用程式使用。

下面使用如下 Docker 命令啟動 RabbitMQ

 docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management

在啟動 RabbitMQ 容器時,它對宿主機暴露了兩個埠號

  • 15672: 表示RabbitMQ 控制檯埠號,可在瀏覽器中通過控制檯來執行 RabbitMQ 的相關操作
  • 5672 表示 RabbitMQ 監聽的TCP 埠號,應用程式可以通過該埠號與 RabbitMQ 建立 TCP 連線,並完成後續的非同步訊息通訊

此外,啟動時還有兩個環境變數

  • RABBITMQ_DEFAULT_USER : 設定控制檯預設使用者名稱, 預設為 guest
  • RABBITMQ_DEFAULT_PASS: 設定控制檯預設密碼,預設為 guest

RabbitMQ 控制檯

RabbitMQ 容器啟動完畢後,開啟瀏覽器,並在位址列中輸入 http://localhost:15672/

,並且輸入登入的使用者名稱和密碼,就可以看到控制檯如下所示

在上面管理介面中,包含 6 個功能選單

  • Overview: 用於檢視 RabbitMQ 基本資訊
  • Connections: 用於檢視 RabbitMQ 客戶端的連線資訊
  • Channels: 用於檢視 RabbitMQ 的通道
  • Exchanges:用於檢視 RabbitMQ 的交換機
  • Queues: 用於檢視 RabbitMQ 的佇列
  • Admin: 用於管理 RabbitMQ 的使用者,虛擬主機,策略等資料

Exchange 和 Queue

RabbitMQ 只有 Queue, 沒有 Topic,因為可通過 Exchange 與 Queue 的組合來實現 Topic 所具備的功能。RabbitMQ 的訊息模型如下圖所示

在 Exchange 和 Queue 間有一個 Binding 關係,當訊息從 Producer 傳送到 Exchange 中時,會根據 Binding 來路由訊息的去向。

  • 如果 Binding 各不相同,那麼該訊息將路由到其中一個 Queue 中,隨後將被一個 Consumer 所消費,此時實現了 "點對點"的訊息通訊模型。
  • 如果 Binding 完全相同,那麼該訊息就會路由到每個 Queue 中,隨後將會被每個 Consumer 消費,這樣就實現了 “釋出與訂閱” 的訊息通訊模型

因此可將 Binding 理解為 Exchange 到 Queue 的路由規則,這些規則可通過 RabbitMQ 所提供的客戶端 API 來控制,也可通過 RabbitMQ 提供的控制檯來管理。

RabbitMQ 提供了一個預設的 Exchange(AMQP default),在控制檯的 Exchange 選單中就可以看到它,簡單情況下,只需要使用預設的 Exchange 即可,當需要提供釋出與訂閱功能時才會使用自定義的 Exchange。

開發服務端和客戶端

下面我們就將 Spring Boot 與 RabbitMQ 進行整合,先開發一個服務端作為訊息的消費者,再開發一個客戶端作為訊息的生產者,隨後執行客戶端,並檢視服務端中接收到的訊息。

開發服務端

建立一個名為 rabbitmq-hello-server 的 maven 專案或者 Spring Starter Project, 在 pom.xml 檔案中新增下面 Maven 依賴

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.19.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

Spring Boot 框架中已經添加了對 RabbitMQ 的支援,只需要依賴 spring-boot-starter-amqp 就可以啟動 RabbitMQ,此時還需要在 application.properties 配置檔案中新增 RabbitMQ 的相關配置項

spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

接下來建立 HelloServer 類,封裝服務端程式碼

package demo.msa.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class HelloServer {

    @RabbitListener(queues = "hello-queue")
    public void receive(String message) {
        System.out.println(message);
    }
}

只需要在 receive() 方法上定義 @RabbitListener ,並且設定 queues 引數來指定消費者需要監聽的的佇列名稱。

最後,編寫一個 Spring Boot 應用程式啟動類來啟動伺服器

package demo.msa.rabbitmq;

import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqHelloServerApplication {

    @Bean
    public Queue helloQueue() {
        return new Queue("hello-queue");
    }
    
    public static void main(String[] args) {
        SpringApplication.run(RabbitmqHelloServerApplication.class, args);
    }

}

在 RabbitMQ 中,必須通過程式來顯式建立佇列。服務端啟動完畢後,將持續監聽 RabbitMQ 的 hello-queue 佇列中即將到來的訊息,該訊息由客戶端來發送。

開發客戶端

建立一個名為 rabbitmq-hello-client 的 maven 專案或者 Spring Starter Project, pom 中的依賴與服務端一致。客戶端的 application.properties 檔案與服務端一致。

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.19.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

接下來建立一個名為 HelloClient 的類,將其作為客戶端

package demo.msa.rabbitmq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HelloClient {
    
    @Autowired
    private RabbitTemplate rabbitmqTemplate;
    
    public void send(String message) {
        rabbitmqTemplate.convertAndSend("hello-queue", message);
    }
}

最後編寫 Spring Boot 應用程式啟動類來啟動客戶端

package demo.msa.rabbitmq;

import javax.annotation.PostConstruct;

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqHelloClientApplication {
    
    @Autowired
    private HelloClient helloClient;
    
    @Bean
    public Queue helloQueue() {
        return new Queue("hello-queue");
    }
    
    @PostConstruct
    public void init() {
        helloClient.send("hello world!");
    }
    
    public static void main(String[] args) {
        SpringApplication.run(RabbitmqHelloClientApplication.class, args).close();
    }

}

與服務端一樣,此處使用 @Bean 註解的 helloQueue() 方法建立一個名為 hello-queue 的佇列,這樣可以保證當客戶端在服務端之前啟動時,也能建立所需的佇列。而且 RabbitMQ 可以確保不會建立同名的佇列,因此可分別在服務端與客戶端建立同名的佇列。

執行 main 方法可以啟動客戶端應用程式,此時將在服務端看到客戶端傳送的訊息,也可以在 RabbitMQ 控制檯中看到訊息隊列當前的狀態。

Java Bean 型別傳輸

上面傳送和接收的訊息只是 String 型別,如果傳送的訊息是一個普通的 Java Bean 型別,應該如何呼叫呢?

Java Bean 型別則必須實現 Serializable 序列化接口才能正常呼叫,這是因為 RabbitMQ 所傳送的訊息是 byte[] 型別,當客戶端傳送訊息需要進行序列化(也就是講 Java 型別轉換為 byte[] 型別),當服務端接收訊息前需要先反序列化,因此傳送和接收的訊息物件必須實現 JDK 的序列化介面。

除了這種序列化方式外,我們也可以使用 Jackson 來實現,而且 RabbitMQ 已經為我們提供了 jackson 序列化的方式,這種方式更加高效。所需要做的是定義一個 Jackson2JsonMessageConverter 的 Spring Bean。

    @Bean
    public Jackson2JsonMessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

結語

RabbitMQ 的效能非常高效和穩定,也能非常方便的與 Spring Boot 應用程式整合,還擁有非常豐富的官方文件和控制檯,因此選擇 RabbitMQ 作為服務之間的非同步訊息呼叫平臺,將成為整個微服務架構中的 "訊息中心"。

參考

  • 《架構探險—輕量級微服務架構》