1. 程式人生 > >SpringBoot RabbitMQ 整合使用

SpringBoot RabbitMQ 整合使用

前提

上次寫了篇文章,《SpringBoot Kafka 整合使用》,閱讀量還挺高的,於是想想還是把其他幾種 MQ 也和 SpringBoot 整合使用下。

下面是四種比較流行的 MQ :

後面都寫寫和 SpringBoot 整合的文章。

安裝 RabbitMQ

由於換 Mac 了,所以一些環境就直接在 Mac 搞,但是像安裝 RabbitMQ 這些又會把自己電腦系統給搞的太亂,所以能在 Docker 裡面安裝就安裝在 Docker,這次 RabbitMQ 我也直接在 Docker 裡安裝。

啟動 Docker for Mac,如果沒安裝過的請看我上一篇文章:http://www.54tianzhisheng.cn/...

當然你也可以在自己的 Linux 伺服器或者虛擬機器裡啟動安裝 RabbitMQ 。

Docker 安裝的話很簡單,因為 RabbitMQ 官方已經提供了自己的 Docker 容器,只需要一行命令:(可右移檢視完整程式碼)

docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management

該映象擁有一個基於 web 的控制檯和 Http API。Http API 可以在地址看到如何使用:http://localhost:15672/api/

講解下上面命令列:

  • 15672 :表示 RabbitMQ 控制檯埠號,可以在瀏覽器中通過控制檯來執行 RabbitMQ 的相關操作。
  • 5672 : 表示 RabbitMQ 所監聽的 TCP 埠號,應用程式可通過該埠與 RabbitMQ 建立 TCP 連線,並完成後續的非同步訊息通訊
  • RABBITMQDEFAULTUSER:用於設定登陸控制檯的使用者名稱,這裡我設定 admin
  • RABBITMQDEFAULTPASS:用於設定登陸控制檯的密碼,這裡我設定 admin

容器啟動成功後,可以在瀏覽器輸入地址:http://localhost:15672/ 訪問控制檯

登陸後:

簡單描述下上圖中中控制檯的列表的作用:

  • Overview :用於檢視 RabbitMQ 的一些基本資訊(訊息佇列、訊息傳送速率、節點、埠和上下文資訊等)
  • Connections:用於檢視 RabbitMQ 客戶端的連線資訊
  • Channels:使用者檢視 RabbitMQ 的通道資訊
  • Exchange:用於檢視 RabbitMQ 交換機
  • Queues:用於檢視 RabbitMQ 的佇列
  • Admin:用於管理使用者,可增加使用者

建立專案

在 IDEA 中建立一個 SpringBoot 專案結構:

SpringBoot 框架中已經內建了對 RabbitMQ 的支援,如果你看過官方文件的話,就可以看到的,我們需要把依賴 spring-boot-starter-amqp 引入就行。

1、 pom.xml 引入依賴後如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.zhisheng</groupId>
    <artifactId>rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>rabbitmq</name>
    <description>Demo project for Spring Boot RabbitMQ</description>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

2、application.properties 配置修改如下:

spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

3、訊息傳送類 RabbitMQClient.java

package com.zhisheng.rabbitmq.client;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/** * Created by zhisheng_tian on 2018/1/23 */
@Componentpublic
class RabbitMQClient {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String message) { rabbitTemplate.convertAndSend("zhisheng", message); }
}

就這樣,傳送訊息程式碼就實現了。

這裡關鍵的程式碼為 rabbitTemplate.convertAndSend() 方法, zhisheng 這個是路由規則(routingKey),它的值表明將訊息傳送到指定的佇列 zhisheng 中去,這裡跟了下原始碼,發現 convertAndSend() 方法最後呼叫的方法其實是一個 doSend() 方法。

4、訊息接收類

package com.zhisheng.rabbitmq.server;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/** * Created by zhisheng_tian on 2018/1/23 */
@Componentpublic
class RabbitMQServer {
    @RabbitListener(queues = "zhisheng")
    public void receive(String message) { System.out.println("收到的 message 是:" + message); }
}

你看,這裡就有個 RabbitListener 一直在監聽著佇列 zhisheng 。

當然這個佇列是必須要我們自己在應用程式中建立好,它不會像我之前寫的文章 《SpringBoot Kafka 整合使用》 中的 Kafka 一樣,Kafka 它會在用到佇列的時候動態的建立,不需要我們提前建立好。

那麼在 RabbitMQ 中該如何建立佇列呢?

如上圖所示:這樣我們就建立好了一個 zhisheng 的佇列,當程式開始執行時,訊息接收類會持續監聽佇列 zhisheng 中即將到來的訊息。

5、執行專案

需要在啟動類中注入傳送訊息的類,並且提供 init 方法,在 init 方法中呼叫傳送訊息類的 send() 方法

@PostConstructpublic
void init() { rabbitMQClient.send("傳送訊息----zhisheng-----"); }

需要注意的是:init() 方法帶有 @PostConstruct 註解,被 @PostConstruct 修飾的方法會在建構函式之後執行。

啟動專案就可以發現控制檯已經接收到訊息了。

6、單執行緒測試效能

看到上面圖片中註釋掉的程式碼沒?那就是用來測試訊息傳送的效能的,我傳送 10000 條訊息看看總共耗時多少。

10000 條訊息傳送耗時:215ms。這是在單執行緒下,下次可以和其他的 MQ 測試對比下,並且也可以在多執行緒的環境下測試效能。

同時從控制檯可以看到傳送的速率:

7、多執行緒測試效能

開了10 個執行緒,每個執行緒傳送 10000 條訊息。

init 方法程式碼如下:

@PostConstruct
public void init() {
    StopWatch stopWatch = new StopWatch(); stopWatch.start();
    int threads = 10; ExecutorService executorService = Executors.newFixedThreadPool(threads);
    final CountDownLatch start = new CountDownLatch(1); final CountDownLatch end = new CountDownLatch(threads);
    for (int i = 0; i < threads; i++) {
        executorService.execute(( ) -> {
        try {
            start.await(); for (int i1 = 0; i1 < 10000; i1++) {
                rabbitMQClient.send("傳送訊息----zhisheng-----");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            end.countDown();
        }
    } );
    }
    start.countDown(); try {
        end.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        executorService.shutdown();
    }

    stopWatch.stop(); System.out.println("傳送訊息耗時:" + stopWatch.getTotalTimeMillis());
}

耗時:4063ms

控制檯顯示如下圖:

8、注意

這裡測試傳送的訊息直接是 String 型別的,你也可以測試下 Bean 類,這需要注意需要序列化。

推薦閱讀:

《深入理解 Java 記憶體模型》讀書筆記

面試-基礎篇

Spring Boot 2.0 遷移指南

SpringBoot使用Docker快速部署專案

為什麼選擇 Spring 作為 Java 框架?

SpringBoot RocketMQ 整合使用和監控

Spring Boot 面試的十個問題

使用 Spring Framework 時常犯的十大錯誤

SpringBoot Admin 使用指南

上篇好文:

SpringBoot Kafka 整合使用