RabbitMQ入門筆記
一、MQ簡介
1.1、什麼是MQ訊息佇列
MQ全稱 Message Queue(訊息佇列),是一種應用間的通訊方式,訊息傳送後可以立即返回,由訊息系統來確保訊息的可靠傳遞。通過典型的 生產者
和消費者
模型,生產者不斷向訊息佇列中生產訊息,消費者不斷的從佇列中獲取訊息。因為訊息的生產和消費都是非同步的,而且只關心訊息的傳送和接收,這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量以及系統之間的解耦。利用高效可靠的訊息傳遞機制進行平臺無關的資料交流,並基於資料通訊來進行分散式系統的整合。
1.2、MQ的優缺點
優點:
-
應用解耦:系統A要傳送一個訊息到多個系統,如果此時每增加一個系統或者刪除一個系統,系統A都需要通過修改原始碼來增加介面,此時耦合非常高,但是如果中間使用訊息佇列的話,系統只需要傳送一次到訊息佇列,別的系統就能複用該資訊,當增加或刪除系統呼叫介面的時候,不需要額外的更新程式碼。一個系統的耦合性越高,容錯性就越低,可維護性就越低。
-
非同步呼叫:還是ABCD四個系統,A系統收到一個請求,需要在自己本地寫庫,還需要往BCD三個系統寫庫,A系統自己寫本地庫需要3ms,往其他系統寫庫相對較慢,B系統200ms ,C系統350ms,D系統400ms,這樣算起來,整個功能從請求到響應的時間為3ms+200ms+350ms+400ms=953ms,接近一秒,時間太長無法接受,如果用了MQ,使用者傳送請求到A系統耗時3ms,A系統傳送三條訊息到MQ,假如耗時5ms,使用者從傳送請求到相應3ms+5ms=8ms,僅用了8ms然後返回結果給使用者,剩下的操作由BCD系統從Mq取出訊息做自己的操作。
-
削峰填谷:削峰就是某一段時間使用者請求突增時,由於系統不能處理這麼高的併發量,就將多餘的請求積壓在MQ中,系統每次從MQ取出一定數量的請求比如1000,當處理完這1000個請求,再從MQ中取出1000請求。由於MQ在高峰期積壓了很多的請求,在高峰期過後的一段時間內,消費訊息的速度還是會維持在1000,直到消費完積壓的訊息,這就叫做“填谷”。
缺點:
- 系統可用性降低:系統引入的外部依賴越多,系統穩定性越差。一旦 MQ 宕機,就會對業務造成影響。需要保證MQ的高可用。
- 系統複雜度提高:MQ 的加入大大增加了系統的複雜度,以前系統間是同步的遠端呼叫,現在是通過 MQ 進行非同步呼叫。如何保證訊息沒有被重複消費?怎麼處理訊息丟失情況?怎麼保證訊息傳遞的順序性?
- 一致性問題:A 系統處理完業務,通過 MQ 給B、C、D三個系統發訊息資料,如果 B 系統、C 系統處理成功,D 系統處理失敗。如何保證訊息資料處理的一致性?
1.3、常見的MQ訊息佇列
ActiveMQ | RabbitMQ | RocketMQ | kafka | |
---|---|---|---|---|
公司/社群 | Apache | Rabbit | 阿里巴巴 | Apache |
協議支援 | OpenWire,STOMP,REST,XMPP,AMQP | AMQP,XMPP,SMTP,STOMP | 自定義 | 自定義協議,社群封裝了http協議支援 |
客戶端支援語言 | Java,C,C++,Python,PHP,Perl,.net等 | 官方支援Erlang,Java,Ruby等,社群產出多種API,幾乎支援所有語言 | Java,C++ | 官方支援Java,社群產出多種API,如PHP,Python等 |
單機吞吐量 | 萬級,吞吐量比RocketMQ和Kafka要低了一個數量級 | 萬級,吞吐量比RocketMQ和Kafka要低了一個數量級 | 10萬級,RocketMQ也是可以支撐高吞吐的一種MQ(最好) | 10萬級別,這是kafka最大的優點,就是吞吐量高。一般配合大資料類的系統來進行實時資料計算、日誌採集等場景 |
topic數量對吞吐量的影響 | topic可以達到幾百,幾千個的級別,吞吐量會有較小幅度的下降這是RocketMQ的一大優勢,在同等機器下,可以支撐大量的topic | topic從幾十個到幾百個的時候,吞吐量會大幅度下降所以在同等機器下,kafka儘量保證topic數量不要過多。如果要支撐大規模topic,需要增加更多的機器資源 | ||
時效性 | ms級 | 微秒級,這是rabbitmq的一大特點,延遲是最低的 | ms級 | 延遲在ms級以內 |
可用性 | 高,基於主從架構實現高可用性 | 高,基於主從架構實現高可用性 | 非常高,分散式架構 | 非常高,kafka是分散式的,一個數據多個副本,少數機器宕機,不會丟失資料,不會導致不可用 |
訊息可靠性 | 有較低的概率丟失資料 | 經過引數優化配置,可以做到0丟失 | 經過引數優化配置,訊息可以做到0丟失 | |
功能支援 | MQ領域的功能極其完備 | 基於erlang開發,所以併發能力很強,效能極其好,延時很低 | MQ功能較為完善,還是分散式的,擴充套件性好 | 功能較為簡單,主要支援簡單的MQ功能,在大資料領域的實時計算以及日誌採集被大規模使用,是事實上的標準 |
優劣勢總結 | 非常成熟,功能強大,在業內大量的公司以及專案中都有應用偶爾會有較低概率丟失訊息而且現在社群以及國內應用都越來越少,官方社群現在對ActiveMQ 5.x維護越來越少幾個月才釋出一個版本而且確實主要是基於解耦和非同步來用的,較少在大規模吞吐的場景中使用 | erlang語言開發,效能極其好,延時很低;吞吐量到萬級,MQ功能比較完備而且開源提供的管理介面非常棒,用起來很好用社群相對比較活躍,幾乎每個月都發布幾個版本分在國內一些網際網路公司近幾年用rabbitmq也比較多一些但是問題也是顯而易見的,RabbitMQ確實吞吐量會低一些,這是因為他做的實現機制比較重。而且erlang開發,國內有幾個公司有實力做erlang原始碼級別的研究和定製?如果說你沒這個實力的話,確實偶爾會有一些問題,你很難去看懂原始碼,你公司對這個東西的掌控很弱,基本職能依賴於開源社群的快速維護和修復bug。而且rabbitmq叢集動態擴充套件會很麻煩,不過這個我覺得還好。其實主要是erlang語言本身帶來的問題。很難讀原始碼,很難定製和掌控。 | 介面簡單易用,而且畢竟在阿里大規模應用過,有阿里品牌保障日處理訊息上百億之多,可以做到大規模吞吐,效能也非常好,分散式擴充套件也很方便,社群維護還可以,可靠性和可用性都是ok的,還可以支撐大規模的topic數量,支援複雜MQ業務場景而且一個很大的優勢在於,阿里出品都是java系的,我們可以自己閱讀原始碼,定製自己公司的MQ,可以掌控社群活躍度相對較為一般,不過也還可以,文件相對來說簡單一些,然後介面這塊不是按照標準JMS規範走的有些系統要遷移需要修改大量程式碼還有就是阿里出臺的技術,你得做好這個技術萬一被拋棄,社群黃掉的風險,那如果你們公司有技術實力我覺得用RocketMQ挺好的 | kafka的特點其實很明顯,就是僅僅提供較少的核心功能,但是提供超高的吞吐量,ms級的延遲,極高的可用性以及可靠性,而且分散式可以任意擴充套件同時kafka最好是支撐較少的topic數量即可,保證其超高吞吐量而且kafka唯一的一點劣勢是有可能訊息重複消費,那麼對資料準確性會造成極其輕微的影響,在大資料領域中以及日誌採集中,這點輕微影響可以忽略這個特性天然適合大資料實時計算以及日誌收集 |
1.4、RabbitMQ的背景
RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現。實現MQ大致有兩種主流方式:AMQP、JMS。
- AMQP(Advanced Message Queue):高階訊息佇列協議,一個提供統訊息服務的應用層標準高階訊息佇列協議。它是應用層協議的一個開放標準,為面向訊息的中介軟體設計,基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受產品、開發語言等條件的限制。AMQP是一個二進位制協議,擁有一些現代化特點:多通道、協商式,非同步,安全,擴平臺,中立,高效。
- JMS(JavaMessage Service):Java訊息服務,應用程式介面,是一個Java平臺中關於面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。
AMQP 與 JMS 區別:
- JMS是定義了統一的介面,來對訊息操作進行統一;AMQP是通過規定協議來統一資料互動的格式
- JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,因此是跨語言的。
- JMS規定了兩種訊息模式;而AMQP的訊息模式更加豐富
簡單來說AMQP是一組協議,而JMS是一套Java的API
詳細瞭解區別可以參考這篇部落格:https://blog.csdn.net/hpttlook/article/details/23391967
二、RabbitMQ的基本概念
Rabbit的基礎架構如下:
-
Broker:表示訊息佇列伺服器實體,接收和分發訊息的應用。
-
Virtual host:當多個不同的使用者使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個vhost,每個使用者在自己的 vhost 建立 exchange/queue 等。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的佇列、交換器、繫結和許可權機制。vhost 是 AMQP 概念的基礎,必須在連線時指定,RabbitMQ 預設的 vhost 是 / 。
-
Connection:publisher/consumer 和 broker 之間的 TCP 連線
-
Channel:如果每一次訪問 RabbitMQ 都建立一個 Connection,在訊息量大的時候建立 TCP Connection的開銷將是巨大的,效率也較低。Channel 是在 connection 內部建立的邏輯連線,如果應用程式支援多執行緒,通常每個thread建立單獨的 channel 進行通訊,AMQP method 包含了channel id 幫助客戶端和message broker 識別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級的 Connection 極大減少了作業系統建立 TCP connection 的開銷.
-
Exchange:message 到達 broker 的第一站,根據分發規則,匹配查詢表中的 routing key,分發訊息到queue 中去。常用的型別有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
-
Queue:訊息佇列,用來儲存訊息直到被消費者取走。它是訊息的容器,也是訊息的終點。一個訊息可投入一個或多個佇列。訊息一直在佇列裡面,等待消費者連線到這個佇列將其取走。
-
Binding:exchange 和 queue 之間的虛擬連線,binding 中可以包含 routing key。Binding 資訊被儲存到 exchange 中的查詢表中,用於 message 的分發依據
-
Publisher:訊息的生產者,也是一個向交換器釋出訊息的客戶端應用程式。
-
Consumer:訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式。
-
Message:訊息,訊息是不具名的,它由訊息頭和訊息體組成。訊息體是不透明的,而訊息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他訊息的優先權)、delivery-mode(指出該訊息可能需要永續性儲存)等。
RabbitMQ官方地址:http://www.rabbitmq.com/
三、RabbitMQ的安裝及配置
- 下載RabbitMq和Erlang的依賴包上傳到Centos7
下載地址:https://www.rabbitmq.com/download.html
RabbitMq是由Erlang編寫的在安裝RabbitMQ之前需要先安裝Erlang
-
安裝
# 1.安裝Erlang依賴包 rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm # 2.安裝RabbitMQ安裝包(需要聯網) #預設安裝完成後rabbitmq有個配置檔案模板/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example,需要將配置檔案複製到/etc/rabbitmq/目錄中,並修改名稱為rabbitmq.config yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm # 3.複製配置檔案到/etc/rabbitmq/目錄下並將名字改為rabbitmq.config cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config # 4.修改配置檔案 開啟guest使用者 vim /etc/rabbitmq/rabbitmq.config
將{loopback_users, []},前邊的‘%%’註釋和後邊的逗號刪掉
# 5.rabbitmq有一個圖形化的管理介面需要啟動這個外掛,需要從瀏覽器開啟這個介面預設埠為15672 rabbitmq-plugins enable rabbitmq_management # 6.啟動RabbitMQ服務 systemctl start rabbitmq-server # 7. 關閉防火牆 systemctl disable firewalld
可以通過http://IP地址:15672訪問web管理介面,賬戶為guest,密碼也為guest
注意:如果使用的雲伺服器需要在控制檯安全組放行埠
-
通過UI介面新增使用者和虛擬主機
- 建立使用者
上面的Tags選項,其實是指定使用者的角色,可選的有以下幾個:
-
超級管理員(administrator)
可登陸管理控制檯,可檢視所有的資訊,並且可以對使用者,策略(policy)進行操作。
-
監控者(monitoring)
可登陸管理控制檯,同時可以檢視rabbitmq節點的相關資訊(程序數,記憶體使用情況,磁碟使用情況等)
-
策略制定者(policymaker)
可登陸管理控制檯, 同時可以對policy進行管理。但無法檢視節點的相關資訊(上圖紅框標識的部分)。
-
普通管理者(management)
僅可登陸管理控制檯,無法看到節點資訊,也無法對策略進行管理。
-
其他
無法登陸管理控制檯,通常就是普通的生產者和消費者。
-
建立虛擬主機
-
繫結使用者和虛擬主機
-
RabbitMq管理行命令
# RabbitMQ除了有web段的圖形化管理頁面也可以使用命令
# 檢視RabbitMQ的命令
rabbitmqctl help
# 檢視RabbitMQ版本
rabbitmqctl version
# 檢視RabbitMQ外掛的命令
rabbitmq-plugins help
# 檢視外掛是否啟動
rabbitmq_management is_enabled
#檢視外掛
rabbitmq-plugins list
rabbitmqctl help 展現的部分命令截圖:
rabbitmq-plugins help 命令截圖:
安裝RabbitMq8可一看看這篇文章:https://blog.csdn.net/weixin_40584261/article/details/106826044
四、RabbitMQ的工作模式
在 RabbitMQ 官網上提供了 6 中工作模式:簡單模式、工作佇列模式、釋出/訂閱模式、路由模式、主題模式 和 RPC 模式以及最新的釋出確認模式。
1、Hello Word簡單模式
簡單工作模式由 3 個物件組成:生產者、佇列、消費者。只有一個消費者,生產者將訊息放入佇列,消費者監聽訊息佇列,如果佇列中有訊息,就消費掉,訊息被拿走後,自動從佇列中刪除。
-
建立Maven工程匯入RabbitMQ的java客戶端依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.2</version> </dependency>
-
獲取與RabbitMQ連線的工具類
public class RabbitMqUtil { //連線工廠,用來建立連線 private static ConnectionFactory connectionFactory; static { connectionFactory = new ConnectionFactory(); //主機地址 connectionFactory.setHost("48.122.183.82"); //rabbitMq預設埠號是5672,UI管理介面是15672 connectionFactory.setPort(5672); //設定虛擬主機,rabbitmq預設虛擬機器名稱為“/”,虛擬機器相當於一個獨立的mq服務虛擬機器相當於一個獨立的mq服務,可以自己在UI介面新建一個虛擬主機並新增使用者 connectionFactory.setVirtualHost("/rabbitmqdemo"); //賬戶 connectionFactory.setUsername("admin"); //密碼 connectionFactory.setPassword("admin"); } public static Connection getConnection() { try { //建立與RabbitMQ服務的TCP連線 return connectionFactory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return null; } public static void closeConnectionAndChannel(Connection connection, Channel channel) { try { if (channel != null) { //關閉通道 channel.close(); } } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } try { if (connection != null) { //關閉連線 connection.close(); } } catch (IOException e) { e.printStackTrace(); } } }
3.生產者
public class Provider { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMqUtil2.getConnection(); //建立與Exchange的通道,每個連線可以建立多個通道 Channel channel = connection.createChannel(); //通道繫結訊息佇列,如果佇列不存在則自動建立 //引數1:佇列的名字 // 引數2用來定義佇列是否持久化 true為持久化(rabbit服務關閉就會丟失佇列,佇列中的訊息也會丟失,設定為true佇列可持久化,要想訊息也被持久化需要額外設定) // 引數3exclusive:是否獨佔佇列(只允許當前連線可用) //引數4:是否在消費完成消費後(消費者與佇列斷開連線)自動刪除佇列 //引數5 : 額外的附加引數 //注意:生產者與消費者佇列的引數必須一致!!!!! channel.queueDeclare("hello",true,false,false,null); //這裡沒有指定交換機,訊息將傳送給預設交換機,每個佇列也會繫結那個預設的交換機,但是不能顯 示繫結或解除繫結,預設的交換機,routingKey等於佇列名稱 //釋出訊息 //引數1: 交換機名稱 //引數2:引數佇列名稱 //引數3:傳遞訊息的額外設定 MessageProperties.PERSISTENT_TEXT_PLAIN :訊息持久化 //引數4:訊息的具體內容 channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,("hello rabbitmq"+new Date()).getBytes()); RabbitMqUtil.closeConnectionAndChannel(connection,channel); } }
4.消費者
/** *消費者開啟後會一直消費佇列中的訊息 * hello word模型 只有一個消費者 一對一 交換機引數為空字串表示預設交換機,生產者生產訊息後直接**放入訊息佇列 */ public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMqUtil.getConnection(); Channel channel = connection.createChannel(); //宣告佇列 channel.queueDeclare("hello", false,false,false,null); //引數1:消費哪個佇列的訊息 //引數2:開啟訊息的自動確認機制 //引數3:消費訊息時的回撥介面 channel.basicConsume("hello",true,new DefaultConsumer(channel){ //body 訊息佇列中取出的訊息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); System.out.println("consumerTag = " + consumerTag); System.out.println("DeliveryTag = " +envelope.getDeliveryTag()); System.out.println("Redeliver = "+envelope.isRedeliver()); } }); } }
2、work queue工作佇列模式
Work Queues
與入門程式的簡單模式
相比,多了一個或一些消費端,多個消費端共同消費同一個佇列中的訊息。預設情況下,RabbitMQ將按順序將每個訊息傳送給下一個使用者。平均而言,每個消費者都會收到相同數量的訊息。這種分發訊息的方式稱為迴圈。對於 任務過重或任務較多情況使用工作佇列可以提高任務處理的速度。
1.生產者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
//通過通道繫結佇列
channel.queueDeclare("workqueue",true,false,false,null);
for (int i = 0; i < 30; i++) {
channel.basicPublish("","workqueue", MessageProperties.PERSISTENT_BASIC,("workqueue訊息"+i).getBytes());
}
RabbitMqUtil.closeConnectionAndChannel(connection,channel);
}
}
/**
可以多寫幾個消費者
*/
class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
//通道繫結佇列
channel.queueDeclare("workqueue",true,false,false,null);
//每次分配一個訊息,確認一個訊息,才又分配一個訊息,為了使消費者做到能者多勞
channel.basicQos(1);
//處理訊息第二個引數為false表示不自動確認
channel.basicConsume("workqueue",false,new DefaultConsumer(channel){
/**
* consumerTag 訊息者標籤,在channel.basicConsume時候可以指定
* envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,交換機,訊息和重傳標誌(收到訊息失敗後是否需要重新發送)
* properties 屬性資訊
* body 訊息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//訊息id
System.out.println("訊息id為:" + envelope.getDeliveryTag());
System.out.println("處理訊息"+new String(body));
//手動確認訊息已經被處理
//引數1:訊息標識
//引數2: 是否同時確認多個訊息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
//通道繫結佇列
channel.queueDeclare("workqueue",true,false,false,null);
//每次分配一個訊息,確認一個訊息,才又分配一個訊息
channel.basicQos(1);
//處理訊息
channel.basicConsume("workqueue",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("處理訊息"+new String(body));
//手動確認訊息已經被處理
//引數1:訊息標識
//引數2: 是否同時確認多個訊息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
3、Publish/Subscribe釋出與訂閱模式
Exchange分發訊息時根據型別的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 訊息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但效能差很多,目前幾乎用不到了。釋出訂閱模式,交換機型別設定為fanout。
- 、每個消費者監聽自己的佇列。
- 生產者將訊息發給broker,由交換機將訊息轉發到繫結此交換機的每個佇列,每個繫結交換機的佇列都將接收到訊息。fanout 型別轉發訊息是最快的。
生產者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
//將通道宣告指定交換機
//引數1:交換機名稱
//引數2: 交換機型別 fanout 廣播型別
channel.exchangeDeclare("logs","fanout");
//傳送訊息,fanout交換機這裡的路由key在廣播下幾乎沒什麼作用
channel.basicPublish("logs","",null,"fanout message queue".getBytes());
RabbitMqUtil.closeConnectionAndChannel(connection,channel);
}
}
消費者
/**
可以多寫幾個消費者
*/
class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
//通道繫結交換機
channel.exchangeDeclare("logs","fanout");
//臨時佇列
String queue = channel.queueDeclare().getQueue();
//交換機繫結臨時佇列
//引數1:佇列名稱
//引數2:交換機名稱
//引數三:routekey,這裡用不到
channel.queueBind(queue,"logs","");
//消費訊息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1:"+ new String(body));
}
});
}
}
class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
//通道繫結交換機
channel.exchangeDeclare("logs","fanout");
//臨時佇列
String queue = channel.queueDeclare().getQueue();
//交換機繫結臨時佇列
channel.queueBind(queue,"logs","");
//消費訊息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2:"+ new String(body));
}
});
}
}
交換機需要與佇列進行繫結,繫結之後;一個訊息可以被多個消費者都收到。
釋出訂閱模式與工作佇列模式的區別:
1、工作佇列模式不用定義交換機,而釋出/訂閱模式需要定義交換機。
2、釋出/訂閱模式的生產方是面向交換機發送訊息,工作佇列模式的生產方是面向佇列傳送訊息(底層使用預設交換機)。
3、釋出/訂閱模式需要設定佇列和交換機的繫結,工作佇列模式不需要設定,實際上工作佇列模式會將佇列綁 定到預設的交換機 。
4、 Routing路由模式
交換機的型別設定為direct
- 佇列與交換機的繫結,不能是任意綁定了,而是要指定一個
RoutingKey
(路由key),佇列可以指定多個RoutingKey; - 訊息的傳送方在 向 Exchange傳送訊息時,也必須指定訊息的
RoutingKey
。 - Exchange不再把訊息交給每一個繫結的佇列,而是根據訊息的
Routing Key
進行判斷,只有佇列的Routingkey
與訊息的Routing key
完全一致,才會接收到訊息
生產者
public class Provider {
public static void main(String[] args) throws IOException {
//連線
Connection connection = RabbitMqUtil.getConnection();
//通道
Channel channel = connection.createChannel();
//交換機,引數1:交換機名稱,引數2:型別路由模式
channel.exchangeDeclare("logs_direct","direct");
//路由key
String routeKey = "error";
//傳送訊息
channel.basicPublish("logs_direct",routeKey,null,("路由訊息key="+routeKey).getBytes());
RabbitMqUtil.closeConnectionAndChannel(connection,channel);
}
}
消費者
class Consumer1 {
public static void main(String[] args) throws IOException {
//連線
Connection connection = RabbitMqUtil.getConnection();
//通道
Channel channel = connection.createChannel();
//交換機
channel.exchangeDeclare("logs_direct","direct");
//建立臨時佇列
String queue = channel.queueDeclare().getQueue();
//基於route key 繫結佇列和交換機
channel.queueBind(queue,"logs_direct","error");
channel.basicConsume(queue,true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1:"+new String(body));
}
});
}
}
class Comsumer2{
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_direct","direct");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"logs_direct","info");
channel.queueBind(queue,"logs_direct","error");
channel.queueBind(queue,"logs_direct","warn");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2:"+ new String(body));
}
});
}
}
5.Topics萬用字元模式
Topic
型別與Direct
相比,都是可以根據RoutingKey
把訊息路由到不同的佇列。只不過Topic
型別Exchange
可以讓佇列在繫結Routing key
的時候使用萬用字元!
Routingkey
一般都是有一個或多個單片語成,多個單詞之間以”.”分割.
:匹配0個或者多個單詞
*:配置一個單詞
生產者
public class Provider {
public static void main(String[] args) throws IOException {
//連線
Connection connection = RabbitMqUtil.getConnection();
//通道
Channel channel = connection.createChannel();
//交換機
channel.exchangeDeclare("ex_topic","topic");
//路由key
String routeKey = "topic";
//生產
channel.basicPublish("ex_topic",routeKey,null,"topic訊息佇列".getBytes());
//關閉資源
RabbitMqUtil.closeConnectionAndChannel(connection,channel);
}
}
消費者
class Consumer1 {
public static void main(String[] args) throws IOException {
//連線
Connection connection = RabbitMqUtil.getConnection();
//通道
Channel channel = connection.createChannel();
//交換機
channel.exchangeDeclare("ex_topic","topic");
//訊息佇列
String queue = channel.queueDeclare().getQueue();
//交換機繫結訊息佇列
channel.queueBind(queue,"ex_topic","*.topic");
//消費
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1:"+new String(body));
}
});
}
}
class Consumer2 {
public static void main(String[] args) throws IOException {
//連線
Connection connection = RabbitMqUtil.getConnection();
//通道
Channel channel = connection.createChannel();
//交換機
channel.exchangeDeclare("ex_topic","topic");
//訊息佇列
String queue = channel.queueDeclare().getQueue();
//交換機繫結訊息佇列
channel.queueBind(queue,"ex_topic","topic");
//消費
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2:"+new String(body));
}
});
}
}
class Consumer3 {
public static void main(String[] args) throws IOException {
//連線
Connection connection = RabbitMqUtil.getConnection();
//通道
Channel channel = connection.createChannel();
//交換機
channel.exchangeDeclare("ex_topic","topic");
//訊息佇列
String queue = channel.queueDeclare().getQueue();
//交換機繫結訊息佇列
channel.queueBind(queue,"ex_topic","topic.#");
//消費
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者3:"+new String(body));
}
});
}
}
五、Spring整合RabbitMQ
生產者:
新增依賴:
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
建立spring-rabbitmq-producer.xml配置檔案
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="74.121.83.182"
port="5672"
username="guest"
password="guest"
virtual-host="/rabbitmqdemo"/>
<!--定義管理交換機、佇列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定義持久化佇列,不存在則自動建立;不繫結到交換機則繫結到預設交換機
預設交換機型別為direct,名字為:"",路由鍵為佇列的名稱
-->
<!--
id:bean的名稱
name:queue的名稱
auto-declare:自動建立
auto-delete:自動刪除。 最後一個消費者和該佇列斷開連線後,自動刪除佇列
exclusive:是否獨佔
durable:是否持久化
-->
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~廣播;所有佇列都能收到訊息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true" />
<!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
<!--定義fanout型別交換機;並繫結上述兩個佇列-->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1" />
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!--direct型別的交換機-->
<!-- <rabbit:direct-exchange name="aa" >
<rabbit:bindings>
<!–direct 型別的交換機繫結佇列 key :路由key queue:佇列名稱–>
<rabbit:binding queue="spring_queue" key="xxx"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>-->
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~萬用字元;*匹配一個單詞,#匹配多個單詞 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
<!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
<rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
<!--定義廣播交換機中的持久化佇列,不存在則自動建立-->
<rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="topic.*" queue="spring_topic_queue_star"/>
<rabbit:binding pattern="topic.#" queue="spring_topic_queue_well"/>
<rabbit:binding pattern="*.topic.#" queue="spring_topic_queue_well2"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定義rabbitTemplate物件操作可以在程式碼中方便傳送訊息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
在test目錄下建立測試類
傳送訊息
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 只發佇列訊息
* 預設交換機型別為 direct
* 交換機的名稱為空,路由鍵為佇列的名稱
*/
@Test
public void queueTest(){
//路由鍵與佇列同名
rabbitTemplate.convertAndSend("spring_queue", "只發佇列spring_queue的訊息。");
}
/**
* 傳送廣播
* 交換機型別為 fanout
* 繫結到該交換機的所有佇列都能夠收到訊息
*/
@Test
public void fanoutTest(){
/**
* 引數1:交換機名稱
* 引數2:路由鍵名(廣播設定為空)
* 引數3:傳送的訊息內容
*/
rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "傳送到spring_fanout_exchange交換機的廣播訊息");
}
/**
* 萬用字元
* 交換機型別為 topic
* 匹配路由鍵的萬用字元,*表示一個單詞,#表示多個單詞
* 繫結到該交換機的匹配佇列能夠收到對應訊息
*/
@Test
public void topicTest(){
/**
* 引數1:交換機名稱
* 引數2:路由鍵名
* 引數3:傳送的訊息內容
*/
rabbitTemplate.convertAndSend("spring_topic_exchange", "topic", "傳送到spring_topic_exchange交換機htopic的訊息");
}
}
消費者:
匯入pom依賴同生產者一樣
建立各個監聽器類
public class SpringTopicListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println(new String(message.getBody()));
}
}
public class SpringQueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
//列印訊息
System.out.println(new String(message.getBody()));
}
}
編寫spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="74.121.83.182"
port="5672"
username="guest"
password="guest"
virtual-host="/rabbitmqdemo"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 配置兩個監聽器-->
<bean id="springQueueListener" class="com.rabbitmq.listener.SpringQueueListener"/>
<bean id="springTopicListener" class="com.rabbitmq.listener.SpringTopicListener"/>
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<rabbit:listener ref = "springTopicListener" queue-names="spring_topic_queue_well"/>
</rabbit:listener-container>
</beans>
六、SpringBoot整合RabbitMQ
6.1、註解的方式
建立SpringBoot工程引入mq的starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
編寫yml配置檔案
spring:
application:
name: springboot_rabbitmq
rabbitmq:
host: 74.121.83.182
port: 5672
username: guest
password: guest
virtual-host: /rabbitmqdemo
-
簡單hello word模型
生產者:
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testHello(){ /** *hello 為佇列名稱 */ rabbitTemplate.convertAndSend("hello","hello world"); }
消費者:
@Component @RabbitListener(queuesToDeclare = @Queue("hello")) public class HelloCustomer { @RabbitHandler public void receive1(String message){ System.out.println("message = " + message); } }
-
work queue工作佇列模型
生產者:
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testWork(){ for (int i = 0; i < 10; i++) { /** *work為佇列名稱 */ rabbitTemplate.convertAndSend("work","hello work!"); } }
消費者:
@Component public class WorkCustomer { /** *@RabbitListenerke可以直接註解在方法上,就不需要@RabbitHandler */ @RabbitListener(queuesToDeclare = @Queue("work")) public void receive1(String message){ System.out.println("work message1 = " + message); } @RabbitListener(queuesToDeclare = @Queue("work")) public void receive2(String message){ System.out.println("work message2 = " + message); } }
:預設在Spring AMQP實現中Work這種方式就是公平排程,如果需要實現能者多勞需要在yaml中配置
spring: rabbitmq: listener: simple: prefetch: 1
-
fanout廣播模型
生產者
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testFanout() throws InterruptedException { rabbitTemplate.convertAndSend("logs","","這是日誌廣播"); }
消費者
@Component public class FanoutCustomer { @RabbitListener(bindings = @QueueBinding( value = @Queue,////建立臨時佇列 exchange = @Exchange(name="logs",type = "fanout") )) public void receive1(String message){ System.out.println("message1 = " + message); } @RabbitListener(bindings = @QueueBinding( value = @Queue, //建立臨時佇列 exchange = @Exchange(name="logs",type = "fanout") //繫結交換機型別 )) public void receive2(String message){ System.out.println("message2 = " + message); } }
-
Route路由模型
生產者
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testDirect(){ rabbitTemplate.convertAndSend("directs","error","error 的日誌資訊"); }
消費者
@Component public class DirectCustomer { @RabbitListener(bindings ={ @QueueBinding( value = @Queue(),//沒有指定名字就是零食佇列 key={"info","error"},//route_key exchange = @Exchange(type = "direct",name="directs") )}) public void receive1(String message){ System.out.println("message1 = " + message); } @RabbitListener(bindings ={ @QueueBinding( value = @Queue(), key={"error"}, exchange = @Exchange(type = "direct",name="directs") )}) public void receive2(String message){ System.out.println("message2 = " + message); } }
-
Topic 訂閱模型
生產者
@Autowired private RabbitTemplate rabbitTemplate; //topic @Test public void testTopic(){ rabbitTemplate.convertAndSend("topics","user.save.findAll","user.save.findAll 的訊息"); }
消費者
@Component public class TopCustomer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, key = {"user.*"}, exchange = @Exchange(type = "topic",name = "topics") ) }) public void receive1(String message){ System.out.println("message1 = " + message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, key = {"user.#"}, exchange = @Exchange(type = "topic",name = "topics") ) }) public void receive2(String message){ System.out.println("message2 = " + message); } }
配置類的方式:
編寫配置類:
@Configuration
public class RabbitMQConfig {
/**
* 交換機
* @return
*/
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange("boot_topic").durable(true).autoDelete().build();
}
/**
* 佇列
* @return
*/
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable("boot_queue").build();
}
/**
* 繫結交換機和佇列
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
生產者:
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("boot_topic","boot.test","hello ! SpringBoot整合RabbitMQ");
}
消費者:
@RabbitListener(queues = "boot_queue")
public void rabbitmqListener(Message message){
System.out.println(message);
}
以上是個人筆記,有錯誤請大家指出