RabbitMQ,想說愛你不容易(附詳細安裝教程)
阿新 • • 發佈:2020-12-06
# 前言
本文講述的只是主要是 `RabbitMQ` 的入門知識,學習本文主要可以掌握以下知識點:
- MQ 的發展史
- AMQP 協議
- Rabbit MQ 的安裝
- Rabbit MQ 在 Java API 中的使用
- RabbitMQ 與 SpringBoot 的整合
# MQ 的誕生歷史
大部分技術的剛產生時適用範圍都是特定的。比如網際網路的產生,剛開始出現的通訊協議各個產商之間是無法相容的,隨著歷史的發展,產生了業內的通訊標準`tcp/ip`協議,而`MQ`也是一樣,第一款 `MQ` 類軟體是由一個在美國印度人 `Vivek Ranadive` 創辦的一家公司 `Teknekron`,並實現了世界上第一個訊息中介軟體 `The Information Bus(TIB)`。
隨著第一款`MQ` 類軟體`TIB`的誕生,各大廠商立刻跟進,百花爭鳴,湧現了一批`MQ`類軟體,比如`IBM`開發的`IBM Wesphere`,微軟開發的`MSMQ`等等,但是正因為標準不統一,這就給我們使用者帶來了很大的不便,每次切換`MQ`時都需要重複去實現不同的協議和不同API的呼叫。
`2001`年,`Java`語言的老東家`Sun`公司釋出了一個`JMS`規範,其在各大產商的`MQ`上進行了統一封裝,使用者如果需要使用不同的`MQ`只需要選擇不同的驅動就可以了(和我們使用資料庫驅動一個道理)。`JMS`規範雖然統一了標準,但是`JMS`規範卻有一個很大的缺陷就是它是和`Java`語言進行繫結的,所以依然沒有從根本上解決問題。
`2004`年,`AMQP`規範出現了,真正做到了跨語言和跨平臺,自此`MQ`迎來了發展的黃金時代。
`2007`年,`Rabbit`公司基於`AMQP`規範開發出了一款訊息佇列`RabbmitMQ`。很快的,`RabbitMQ`就得到了大家的喜愛,被使用者廣泛使用。
# 什麼是 MQ
`MQ`即:`Message Queue`,稱之為訊息佇列或者訊息中介軟體。`MQ`的本質是:使用高效可靠的訊息傳遞機制來進行與平臺無關的資料傳遞,並基於資料通訊來進行分散式系統的整合。也就是說`MQ`主要是用來解決訊息的通訊問題,其主要有以下三個特點:
- 1、`MQ`是一個獨立執行的服務。通過生產者來發送訊息,使用消費者來接收消費。
- 2、內部採用了佇列來進行訊息儲存,一般採用的均是先進先出(`FIFO`)佇列。
- 3、具有釋出訂閱的模型,消費者可以根據需要來獲取自己想要的訊息。
# 為什麼需要 MQ
以`Java`語言為例,`JDK`本身就提供了許多不同型別的佇列,那麼為什麼還需要用`MQ`呢?這是因為:
- 1、跨語言。各大程式語言內部實現的佇列是和語言繫結的,而且是單機的,在分散式環境下無法很好的工作,所以我們需要可以單獨部署不依賴於語言的`MQ`。
- 2、非同步解耦。訊息佇列可以實現非同步通訊,這樣傳送訊息方只需要關心訊息是否傳送成功,而接受訊息方只需要關心怎麼處理佇列中的訊息,實現了消費和生產者的解耦。
- 3、流量削峰。因為訊息佇列是先進先出,所以如果把需要消費的訊息放進佇列,那麼消費者就可以避免被瞬間大流量擊垮,而是可以從容的根據自己的能力從佇列中取出訊息進行消費。
# RabbitMQ
`RabbitMQ` 中的 `Rabbit` 是兔子的意思,就是形容跑的和兔子一樣快。其是一款輕量級的,支援多種訊息傳遞協議的高可用的訊息佇列。`RabbitMQ` 是由 `Erlang` 語言編寫的,而 `Erlang` 語言就是一款天生適合高併發的語言。
## RabbitMQ 的優勢和特性
`RabbitMQ` 作為一款非常流行的訊息中介軟體,其有著非常豐富的特性和優勢:
- 高可靠性:`RabbitMQ` 提供了持久化、傳送應答、釋出確認等功能來保證其可靠性。
- 靈活的路由:通過不同的交換機(Exchange)來實現了訊息的靈活路由。
- 叢集與擴充套件性:多個節點可以組成一個邏輯上的伺服器,支援負載。
- 高可用性:通過映象佇列實現了佇列中資料的複製,保證了在極端情況下部分節點出現 `crash` 整個叢集仍然可用。
- 支援多種協議:`RabbitMQ` 最初是為了支援 `AMQP` 協議而開發的,所以 `AMQP` 是其核心協議,但是其也支援其他如:`STOMP`,`MOTT`,`HTTP` 等協議。
- 支援多客戶端:`RabbitMQ` 幾乎支援所有常用語言客戶端,如:`Java`,`Python`,`Ruby`,`Go` 等。
- 豐富的外掛系統:支援各種豐富的外掛擴充套件,同時也支援自定義外掛。比如最常用的 `RabbitMQ` 後臺管理系統就是以外掛的形式實現的。
## AMQP 模型
[AMQP](https://www.amqp.org/) 全稱是:Advanced Message Queuing Protocol。`RabitMQ` 最核心的協議就是基於 `AMQP` 模型的 `AMQP` 協議,`AMQP` 模型目前最新的版本是 `1.0` 版本,但是目前官方推薦使用者的最佳版本仍是基於 `0.9.1` 版本的 `AMQP` 模型,`0.9.1` 版本在 `RabbitMQ`官網中也將其稱之為 `AMQP 0-9-1`模型。
`AMQP 0-9-1`(高階訊息佇列協議)是一種訊息傳遞協議,它允許符合標準的客戶端應用程式與符合標準的訊息傳遞中介軟體代理進行通訊。訊息傳遞代理(Broker)從釋出者(Publisher,即釋出訊息的應用程式,也稱為生產者:Producter)接收訊息,並將其路由到使用者(消費者:Consumer,即處理訊息的應用程式)。
`AMQP 0-9-1` 模型的核心思想為:訊息被髮布到交換處,通常被比作郵局或郵箱。然後,交換機使用稱為繫結的規則將訊息副本分發到佇列。然後,代理將訊息傳遞給訂閱了佇列的使用者,或者使用者根據需要從佇列中獲取/提取訊息。
下圖就是一個 `AMQP` 模型簡圖,理解了這幅圖,那麼就基本理解了 `RabbitMQ` 的工作模式。
![](https://img2020.cnblogs.com/blog/2232223/202012/2232223-20201206181726801-15609555.png)
### Producer 和 Consumer
`Producer ` 即生產者,一般指的是應用程式客戶端,生產者會產生訊息傳送給 `RabbitMQ`,等待消費者進行處理。
`Consumer` 即消費者,消費者會從特定的佇列中取出訊息,進行消費。當訊息傳遞給消費者時,消費者會自動通知 `Broker`,`Broker` 只有在收到關於該訊息的通知時才會從佇列中完全刪除該訊息。
### Connection:我是一個 TCP 長連線
生產者傳送訊息和消費者接收訊息之前都必須要和 `Broker` 建立一個 `tcp` 長連線,才能進行通訊。
### Channel:我是被虛構出來的
訊息佇列的作用之一就是用來做削峰,所以訊息佇列在高併發場景可能會有大量的生產者和消費者,那麼假如每一個生產者在傳送訊息時或者每一個消費者在消費訊息時都需要不斷的建立和銷燬 `tcp` 連線,那麼這對 `Broker` 會是一個很大的消耗,為了降低這個 `tcp` 連線的建立頻率,`AMQP` 模型引入了 `Channel`(通道或者通道)。
`Channel` 是一個虛擬的的連線,可以被認為是“輕量級的連線,其共享同一個 `tcp` 連線”。在同一個 `tcp` 長連線裡面可以通過建立和銷燬不同的 `Channel` 來減少了建立和銷燬 `tcp` 連線的頻率,從而大大減少了資源的消耗。
客戶端(生產者/消費者)執行的每個協議操作都發生在通道上。特定 `Channel` 上的通訊完全獨立於另一個 `Channel` 上的通訊,因此每個協議方法還攜帶一個`Channel ID`(又稱通道號)。
`Channel` 只存在於連線的上下文中,不會獨立存在,所以當一個 `tcp` 連線被關閉時,其中所有 `Channel` 也都會被關閉。
`Channel` 是執行緒不安全的,所以對於使用多個執行緒/程序進行處理的應用程式,需要為每個執行緒/程序建立一個 `Channel`,而不是共享同一個 `Channel`。
### Broker:我只是一個普通的代理商
`Broker` 直接翻譯成中文就是:中介/代理,所以如果我們使用的是 `RabbitMQ`,那麼這個 `Broker` 就是指的 `RabbitMQ` 服務端。
### Exchange:我只是做一個訊息的對映
`Echange` 即交換機,因為要實現生產者和消費者的多對多關係,所以只有一個佇列是無法滿足要求的,那麼如果有多個佇列,每次我們傳送的訊息應該儲存到哪裡呢?交換機就是起到了中間角色的作用,我們傳送訊息到交換機上,然後通過交換機發送到對應的佇列,交換機和佇列之間需要提前繫結好對應關係,這樣訊息就到了各自指定的佇列內,然後消費者就可以直接從各自負責的佇列內取出訊息進行消費。
### Queue:我才是真正儲存訊息的地方
訊息傳送到 `Broker` 之後,通過交換機的對映,儲存到指定的 `Queue`裡面。
### VHost:我只是一個名稱空間而已
`VHost` 類似於名稱空間,主要作用就是用來隔離資料的,比如我們由很多個業務系統都需要用到 `RabbitMQ`,如果是一臺伺服器完全可以滿足要求,那就沒必要安裝多個 `RabbitMQ` 了,這時候就可以定義不同的 `VHost` ,不同的 `VHost` 就可以實現各個業務系統間的資料隔離。
## RabbitMQ 的安裝
`RabbitMQ` 是用 `Erlang` 語言開發的,所以在安裝 `RabbitMQ` 之前,需要先安裝 `Erlang`,`RabbitMQ` 和 `Erlang` 之間有[版本對應關係](https://www.rabbitmq.com/which-erlang.html),這個需要注意,本文以 `Erlang 21.3` 和 `RabbitMQ3.8.4` 為例進行安裝 。
1. 安裝 `Erlang` :
```java
yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget //提前安裝一些依賴,個人電腦依賴不同,可根據實際情況選擇未安裝的依賴進行安裝
wget http://erlang.org/download/otp_src_21.3.tar.gz # 下載(也可以下載好傳到伺服器)
tar -xvf otp_src_21.3.tar.gz //解壓
mkdir erlang //在指定目錄,如/usr/local下建立erlang目錄
cd otp_src_21.3 //切換到解壓後的目錄
./configure --prefix=/usr/local/erlang //編譯(路徑根據實際情況選擇)
make && make install //安裝
```
2. 配置 `Erlang` 環境變數:
```java
vim /etc/profile //編輯環境變數檔案(CentOS系統預設環境變數檔案,其他系統可能不一樣)
export PATH=$PATH:/usr/local/erlang/bin //在末尾加入環境變數配置(路徑根據實際情況選擇)
source /etc/profile //實時生效
```
3. 輸入 `erl` 驗證 `Erlang` 是否安裝成功。如果出現如下顯示版本號的介面則說明安裝成功(可以輸入 `halt().` 命令進行退出):
![](https://img2020.cnblogs.com/blog/2232223/202012/2232223-20201206181816530-151036683.png)
4. 安裝 `RabbitMQ`:
```java
wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.8.4/rabbitmq-server-generic-unix-3.8.4.tar.xz //下載RabbitMQ
xz -d rabbitmq-server-generic-unix-3.8.4.tar.xz //解壓
tar -xvf rabbitmq-server-generic-unix-3.8.4.tar //解壓
```
5. 同樣的,這裡需要進行環境變數配置:
```java
vim /etc/profile //編輯環境變數檔案(CentOS系統預設環境變數檔案,其他系統可能不一樣)
export PATH=$PATH:/usr/local/rabbitmq_server-3.8.4/sbin //在末尾加入環境變數配置(路徑根據實際情況選擇)
source /etc/profile //實時生效
```
6. 啟動 `RabbitMQ` ,預設埠為 `6752`:
```java
/usr/local/rabbitmq_server-3.8.4/sbin/./rabbitmq-server -detached //在後臺啟動。根據自己實際路徑選擇,或者也可以選擇service或者systemctl等命令啟動
```
7. 如果沒有報錯則說明啟動成功,啟動之後預設會建立一個 `guest/guest` 賬戶,只能本地連線,所以還需要再重新建立一個使用者,並給新使用者授權(當然,我們也可以直接給 `guest` 使用者授權):
```java
./rabbitmqctl add_user admin 123456 //建立使用者admin
./rabbitmqctl set_user_tags admin administrator //新增標籤
./rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" //授權
```
8. `RabbitMQ` 預設還提供了視覺化管理介面,需要手動開啟一下,預設埠為 `15672`:
```java
./rabbitmq-plugins enable rabbitmq_management //啟動後臺管理系統外掛(禁止的話換成disable即可)
```
9. 開啟外掛之後,可以通過訪問:`http://ip:15672/` 訪問後臺管理系統,並進行一些引數設定,賬號密碼就是上面新增的 `admin/123456`。
![](https://img2020.cnblogs.com/blog/2232223/202012/2232223-20201206181844356-458750130.png)
### 安裝過程常見錯誤
安裝過程中可能會出現如下圖所示錯誤:
![](https://img2020.cnblogs.com/blog/2232223/202012/2232223-20201206181910112-1889473836.png)
1. odbc:ODBC library - link check failed:
解決方法:執行命令 `yum install unixODBC.x86_64 unixODBC-devel.x86_64` 進行安裝。
2. wx:wxWidgets not found, wx will NOT be usable:
解決方法:這個屬於 `APPLOICATION INFORMATION` ,可以不處理。
3. fakefop to generate placeholder PDF files,documentation: fop is missing.Using fakefop to generate placeholder PDF files:
解決方法:執行命令 `yum install fop.noarch` 進行安裝。
## 利用 Java API 實現一個生產者和消費者
接下來用 `Java` 原生的 `API` 來實現一個簡單的生產者和消費者:
1. `pom.xml` 檔案引入`RabbitMQ` 客戶端依賴:
```xml
```
2. 新建一個消費者 `TestRabbitConsumer` 類:
```java
package com.lonelyWolf.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
public class TestRabbitConsumer {
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://admin:123456@ip:5672");
Connection conn = factory.newConnection();//建立連線
Channel channel = conn.createChannel(); //建立訊息通道
channel.queueDeclare("TEST_QUEUE", false, false, false, null);//宣告佇列
System.out.println("正在等待接收訊息...");
Consumer consumer = new DefaultConsumer(channel) {//建立消費者
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
System.out.println("收到訊息: " + new String(body, "UTF-8") + ",當前訊息ID為:" + properties.getMessageId());
System.out.println("收到自定義屬性:"+ properties.getHeaders().get("name"));
}
};
channel.basicConsume("TEST_QUEUE", true, consumer);//消費之後,回撥給consumer
}
}
```
3. 新建一個生產者 `TestRabbitProducter`類:
```java
package com.lonelyWolf.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class TestRabbitProducter {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://admin:123456@ip:5672");
Connection conn = factory.newConnection();// 建立連線
Channel channel = conn.createChannel();//建立訊息通道