RabbitMQ:入門(一)——RabbitMQ的安裝以及使用(Windows環境)
一、RabbitMQ介紹
1、RabbitMQ簡介
RabbitMQ是一個訊息代理:它接受和轉發訊息。你可以把它想象成一個郵局:當你把你想要釋出的郵件放在郵箱中時,你可以確定郵差先生最終將郵件傳送給你的收件人。在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。
RabbitMQ和郵局的主要區別在於它不處理紙張,而是接受,儲存和轉發二進位制資料塊 - 訊息。引自(https://www.rabbitmq.com/tutorials/tutorial-one-java.html)官網介紹。
儘管訊息流經RabbitMQ,但它們只能儲存在佇列中。一個佇列只受主機記憶體和磁碟限制的約束,它本質上是一個很大的訊息緩衝區。許多生產者可以傳送進入一個佇列的訊息,並且許多消費者可以嘗試從一個佇列接收資料。實質上是生產者——消費者關係。
2、什麼叫訊息佇列
訊息(Message)是指在應用間傳送的資料。訊息可以非常簡單,比如只包含文字字串,也可以更復雜,可能包含嵌入物件。
訊息佇列(Message Queue)是一種應用間的通訊方式,訊息傳送後可以立即返回,由訊息系統來確保訊息的可靠傳遞。訊息釋出者只管把訊息釋出到 MQ 中而不用管誰來取,訊息使用者只管從 MQ 中取訊息而不管是誰釋出的。這樣釋出者和使用者都不用知道對方的存在。
(注:該段引用來源:https://www.jianshu.com/p/79ca08116d57)
3、為何用訊息佇列
從上面的描述中可以看出訊息佇列是一種應用間的非同步協作機制,那什麼時候需要使用 MQ 呢?
以常見的訂單系統為例,使用者點選【下單】按鈕之後的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發簡訊通知。在業務發展初期這些邏輯可能放在一起同步執行,隨著業務的發展訂單量增長,需要提升系統服務的效能,這時可以將一些不需要立即生效的操作拆分出來非同步執行,比如發放紅包、發簡訊通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之後傳送一條訊息到 MQ 讓主流程快速完結,而由另外的單獨執行緒拉取MQ的訊息(或者由 MQ 推送訊息),當發現 MQ 中有發紅包或發簡訊之類的訊息時,執行相應的業務邏輯。
以上是用於業務解耦的情況,其它常見場景包括最終一致性、廣播、錯峰流控等等。
4、RabbitMQ 特點
RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。
AMQP :Advanced Message Queue,高階訊息佇列協議。它是應用層協議的一個開放標準,為面向訊息的中介軟體設計,基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受產品、開發語言等條件的限制。
RabbitMQ 最初起源於金融系統,用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。具體特點包括:
(1)可靠性(Reliability)
RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、釋出確認。
(2)靈活的路由(Flexible Routing)
在訊息進入佇列之前,通過 Exchange 來路由訊息的。對於典型的路由功能,RabbitMQ 已經提供了一些內建的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 繫結在一起,也通過外掛機制實現自己的 Exchange 。
(3)訊息叢集(Clustering)
多個 RabbitMQ 伺服器可以組成一個叢集,形成一個邏輯 Broker 。
(4)高可用(Highly Available Queues)
佇列可以在叢集中的機器上進行映象,使得在部分節點出問題的情況下佇列仍然可用。
(5)多種協議(Multi-protocol)
RabbitMQ 支援多種訊息佇列協議,比如 STOMP、MQTT 等等。
(6)多語言客戶端(Many Clients)
RabbitMQ 幾乎支援所有常用語言,比如 Java、.NET、Ruby 等等。
(7)管理介面(Management UI)
RabbitMQ 提供了一個易用的使用者介面,使得使用者可以監控和管理訊息 Broker 的許多方面。
(8)跟蹤機制(Tracing)
如果訊息異常,RabbitMQ 提供了訊息跟蹤機制,使用者可以找出發生了什麼。
(9)外掛機制(Plugin System)
RabbitMQ 提供了許多外掛,來從多方面進行擴充套件,也可以編寫自己的外掛。
(注:該段引用來源:https://www.jianshu.com/p/79ca08116d57)
5、RabbitMQ 中的概念模型——訊息模型
所有 MQ 產品從模型抽象上來說都是一樣的過程:
消費者(consumer)訂閱某個佇列。生產者(producer)建立訊息,然後釋出到佇列(queue)中,最後將訊息傳送到監聽的消費者。
6、RabbitMQ 基本概念
上面只是最簡單抽象的描述,具體到 RabbitMQ 則有更詳細的概念需要解釋。上面介紹過 RabbitMQ 是 AMQP 協議的一個開源實現,所以其內部實際上也是 AMQP 中的基本概念:
(1)Message
訊息,訊息是不具名的,它由訊息頭和訊息體組成。訊息體是不透明的,而訊息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他訊息的優先權)、delivery-mode(指出該訊息可能需要永續性儲存)等。
(2)Publisher
訊息的生產者,也是一個向交換器釋出訊息的客戶端應用程式。
(3)Exchange
交換器,用來接收生產者傳送的訊息並將這些訊息路由給伺服器中的佇列。
(4)Binding
繫結,用於訊息佇列和交換器之間的關聯。一個繫結就是基於路由鍵將交換器和訊息佇列連線起來的路由規則,所以可以將交換器理解成一個由繫結構成的路由表。
(5)Queue
訊息佇列,用來儲存訊息直到傳送給消費者。它是訊息的容器,也是訊息的終點。一個訊息可投入一個或多個佇列。訊息一直在佇列裡面,等待消費者連線到這個佇列將其取走。
(6)Connection
網路連線,比如一個TCP連線。
(7)Channel
通道,多路複用連線中的一條獨立的雙向資料流通道。通道是建立在真實的TCP連線內地虛擬連線,AMQP 命令都是通過通道發出去的,不管是釋出訊息、訂閱佇列還是接收訊息,這些動作都是通過通道完成。因為對於作業系統來說建立和銷燬 TCP 都是非常昂貴的開銷,所以引入了通道的概念,以複用一條 TCP 連線。
(8)Consumer
訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式。
(9)Virtual Host
虛擬主機,表示一批交換器、訊息佇列和相關物件。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的佇列、交換器、繫結和許可權機制。vhost 是 AMQP 概念的基礎,必須在連線時指定,RabbitMQ 預設的 vhost 是 / 。
(10)Broker
表示訊息佇列伺服器實體。
(注:該段引用來源:https://www.jianshu.com/p/79ca08116d57)
二、RabbitMQ的安裝
1、Erlang的安裝
首先,您需要安裝支援的 Windows 版Erlang。下載並執行Erlang for Windows 安裝程式。下載地址http://www.erlang.org/downloads,我是64位的所以下載的64位版本。官網下載速度很慢,可以通過我雲盤下載:https://pan.baidu.com/s/1eTkk5BO 密碼:wo1b,下載完成後直接安裝,一直NEXT就行。
2、RabbitMQ安裝程式
執行RabbitMQ安裝程式rabbitmq-server-3.7.3.exe(下載地址http://www.rabbitmq.com/install-windows.html)注意版本,當前最新版本為3.7.3。它將RabbitMQ安裝為Windows服務並使用預設配置啟動它。同樣,一直NEXT就行。
3、自定義環境變數
該服務將使用其預設設定正常執行。你可以自定義RabbitMQ環境或編輯配置。
(1)erl環境變數配置
ERLANG_HOME=C:\Program Files\erl9.2
在Path中加入
%ERLANG_HOME%\bin;
測試erl配置是否正確,開始-執行-cmd,輸入erl,顯示如下,證明配置正確
(2)RabbitMQ環境變數配置
這裡注意,看好你RabbitMQ的安裝位置,以及安裝的版本,我的版本為3.7.3
RABBITMQ_SERVER=C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.3
在Path中加入
%RABBITMQ_SERVER%\sbin;
4、啟用rabbitmq_management
在CMD中鍵入如下命令
"C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.3\sbin\rabbitmq-plugins.bat" enable rabbitmq_management
顯示如下:
5、啟動RabbitMQ服務
直接在命令列介面鍵入如下命令
net start RabbitMQ
因為RabbitMQ預設啟動的,當鍵入啟動命令時,會出現如下情況,直接關閉RabbitMQ服務,在啟動就行,
關閉RabbitMQ服務命令如下:
net stop RabbitMQ
三、RabbitMQ測試
測試地址 http://localhost:15672/
預設的使用者名稱:guest
預設的密碼為:guest
四、Java客戶端測試
1、RabbitMQ的jar包下載
如果你使用的是maven進行專案管理,請忽略,直接進入第二部,RabbitMQ Java客戶端的當前版本是 5.1.2。下載地址http://www.rabbitmq.com/java-client.html
2、maven專案中新增RabbitMQ依賴
直接在專案的pom.xml檔案中新增如下依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.1.2</version>
</dependency>
3、新建SendMQ類,傳送端(注該例項來自官網程式碼https://www.rabbitmq.com/tutorials/tutorial-one-java.html)
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class SendMQ {
private final static String QUEUE_NAME = "Hello";
public static void main(String[] args) throws IOException, Exception {
// connection是socket連線的抽象,並且為我們管理協議版本協商(protocol version negotiation),
// 認證(authentication )等等事情。這裡我們要連線的訊息代理在本地,因此我們將host設為“localhost”。
// 如果我們想連線其他機器上的代理,只需要將這裡改為特定的主機名或IP地址。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672); //預設埠號
factory.setUsername("guest");//預設使用者名稱
factory.setPassword("guest");//預設密碼
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 接下來,我們建立一個channel,絕大部分API方法需要通過呼叫它來完成。
// 傳送之前,我們必須宣告訊息要發往哪個佇列,然後我們可以向佇列發一條訊息:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello world";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
4、新建RecvMQ類,接收端
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import org.junit.Test;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class RecvMQ {
private final static String QUEUE_NAME = "Hello";
public static void main(String[] args) throws IOException, Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}