RabbitMQ六種工作模式的對比與實踐
目錄
上一篇部落格我們介紹了RabbitMQ訊息通訊中的一些基本概念,這篇部落格我們介紹 RabbitMQ 的五種工作模式,這也是實際使用RabbitMQ需要重點關注的。
這裡是RabbitMQ 官網中的相關介紹:http://www.rabbitmq.com/getstarted.html
本篇部落格原始碼下載地址:https://github.com/YSOcean/RabbitMQTest
回到頂部1、簡單佇列
其實上篇文章末尾給出的程式碼就是簡單佇列。
一個生產者對應一個消費者!!!
生產者將訊息傳送到“hello”佇列。消費者從該佇列接收訊息。
①、pom檔案
必須匯入rabbitmq 依賴包
1 <dependency> 2 <groupId>com.rabbitmq</groupId> 3 <artifactId>amqp-client</artifactId> 4 <version>3.4.1</version> 5 </dependency>
②、工具類
1 package com.ys.utils; 2 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 6 /** 7 * Create by hadoop 8 */ 9 public class ConnectionUtil { 10 11 public static Connection getConnection(String host,int port,String vHost,String userName,String passWord) throws Exception{ 12 //1、定義連線工廠 13 ConnectionFactory factory = new ConnectionFactory(); 14 //2、設定伺服器地址 15 factory.setHost(host); 16 //3、設定埠 17 factory.setPort(port); 18 //4、設定虛擬主機、使用者名稱、密碼 19 factory.setVirtualHost(vHost); 20 factory.setUsername(userName); 21 factory.setPassword(passWord); 22 //5、通過連線工廠獲取連線 23 Connection connection = factory.newConnection(); 24 return connection; 25 } 26 }
③、生產者 Producer
1 package com.ys.simple; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.ys.utils.ConnectionUtil; 6 7 /** 8 * Create by YSOcean 9 */ 10 public class Producer { 11 private final static String QUEUE_NAME = "hello"; 12 13 public static void main(String[] args) throws Exception{ 14 //1、獲取連線 15 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); 16 //2、宣告通道 17 Channel channel = connection.createChannel(); 18 //3、宣告(建立)佇列 19 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 20 //4、定義訊息內容 21 String message = "hello rabbitmq "; 22 //5、釋出訊息 23 channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); 24 System.out.println("[x] Sent'"+message+"'"); 25 //6、關閉通道 26 channel.close(); 27 //7、關閉連線 28 connection.close(); 29 } 30 }
④、消費者Consumer
1 package com.ys.simple; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.QueueingConsumer; 6 import com.ys.utils.ConnectionUtil; 7 8 9 /** 10 * Create by YSOcean 11 */ 12 public class Consumer { 13 14 private final static String QUEUE_NAME = "hello"; 15 16 public static void main(String[] args) throws Exception{ 17 //1、獲取連線 18 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); 19 //2、宣告通道 20 Channel channel = connection.createChannel(); 21 //3、宣告佇列 22 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 23 //4、定義佇列的消費者 24 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); 25 //5、監聽佇列 26 /* 27 true:表示自動確認,只要訊息從佇列中獲取,無論消費者獲取到訊息後是否成功消費,都會認為訊息已經成功消費 28 false:表示手動確認,消費者獲取訊息後,伺服器會將該訊息標記為不可用狀態,等待消費者的反饋, 29 如果消費者一直沒有反饋,那麼該訊息將一直處於不可用狀態,並且伺服器會認為該消費者已經掛掉,不會再給其 30 傳送訊息,直到該消費者反饋。 31 */ 32 33 channel.basicConsume(QUEUE_NAME,true,queueingConsumer); 34 //6、獲取訊息 35 while (true){ 36 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); 37 String message = new String(delivery.getBody()); 38 System.out.println(" [x] Received '" + message + "'"); 39 } 40 } 41 42 }
注意這裡消費者有自動確認訊息和手動確認訊息兩種模式。
回到頂部2、work 模式
一個生產者對應多個消費者,但是隻能有一個消費者獲得訊息!!!
競爭消費者模式。
①、生產者
View Code②、消費者
這裡建立兩個消費者
消費者1:每接收一條訊息後休眠10毫秒
View Code消費者2:每接收一條訊息後休眠1000毫秒
View Code③、測試結果
首先生產者一次列印從0-9條訊息
接著我們看消費者1:結果為列印偶數條訊息
消費者2:結果為列印奇數條訊息
④、分析結果
消費者1和消費者2獲取到的訊息內容是不同的,也就是說同一個訊息只能被一個消費者獲取。
消費者1和消費者2分別獲取奇數條訊息和偶數條訊息,兩種獲取訊息的條數是一樣的。
前面我們說這種模式是競爭消費者模式,一條佇列被多個消費者監聽,這裡兩個消費者,其中消費者1和消費者2在獲取訊息後分別休眠了10毫秒和1000毫秒,也就是說兩個消費者獲取訊息的效率是不一樣的,但是結果卻是兩者獲得的訊息條數是一樣的,這根本就不構成競爭關係,那麼我們應該怎麼辦才能讓工作效率高的消費者獲取訊息更多,也就是消費者1獲取訊息更多呢?
PS:在增加一個消費者其實獲取訊息條數也是一樣的,消費者1獲取0,3,6,9,消費者2獲取1,4,7,消費者3獲取2,5,8
⑤、能者多勞
channel.basicQos(1);
增加如上程式碼,表示同一時刻伺服器只會傳送一條訊息給消費者。消費者1和消費者2獲取訊息結果如下:
⑥、應用場景
效率高的消費者消費訊息多。可以用來進行負載均衡。
回到頂部3、釋出/訂閱模式
一個消費者將訊息首先發送到交換器,交換器繫結到多個佇列,然後被監聽該佇列的消費者所接收並消費。
ps:X表示交換器,在RabbitMQ中,交換器主要有四種類型:direct、fanout、topic、headers,這裡的交換器是 fanout。下面我們會詳細介紹這幾種交換器。
①、生產者
1 package com.ys.ps; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.ys.utils.ConnectionUtil; 6 7 /** 8 * Create by YSOcean 9 */ 10 public class Producer { 11 private final static String EXCHANGE_NAME = "fanout_exchange"; 12 13 public static void main(String[] args) throws Exception { 14 //1、獲取連線 15 Connection connection = ConnectionUtil.getConnection("192.168.146.251", 5672, "/", "guest", "guest"); 16 //2、宣告通道 17 Channel channel = connection.createChannel(); 18 //3、宣告交換器 19 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 20 //4、建立訊息 21 String message = "hello rabbitmq"; 22 //5、釋出訊息 23 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); 24 System.out.println("[x] Sent'" + message + "'"); 25 //6、關閉通道 26 channel.close(); 27 //7、關閉連線 28 connection.close(); 29 } 30 }
②、消費者
消費者1:
View Code消費者2:
View Code注意:消費者1和消費者2兩者監聽的佇列名稱是不一樣的,我們可以通過前臺管理系統看到:
③、測試結果
消費1和消費者2都消費了該訊息。
ps:這是因為消費者1和消費者2都監聽了被同一個交換器繫結的佇列。如果訊息傳送到沒有佇列繫結的交換器時,訊息將丟失,因為交換器沒有儲存訊息的能力,訊息只能儲存在佇列中。
④、應用場景
比如一個商城系統需要在管理員上傳商品新的圖片時,前臺系統必須更新圖片,日誌系統必須記錄相應的日誌,那麼就可以將兩個佇列繫結到圖片上傳交換器上,一個用於前臺系統更新圖片,另一個用於日誌系統記錄日誌。
回到頂部4、路由模式
生產者將訊息傳送到direct交換器,在繫結佇列和交換器的時候有一個路由key,生產者傳送的訊息會指定一個路由key,那麼訊息只會傳送到相應key相同的佇列,接著監聽該佇列的消費者消費訊息。
也就是讓消費者有選擇性的接收訊息。
①、生產者
View Code②、消費者
消費者1:
View Code消費者2:
View Code③、測試結果
我們首先看程式碼,生產者釋出訊息,指定的路由key為update。消費者1繫結佇列和交換機時key分別是update/delete/add;消費者2繫結佇列和交換器時key是select。
所以我們可以猜測生產者傳送的訊息,只有消費者1能夠接收並消費,而消費者2是不能接收的。
④、應用場景
利用消費者能夠有選擇性的接收訊息的特性,比如我們商城系統的後臺管理系統對於商品進行修改、刪除、新增操作都需要更新前臺系統的介面展示,而查詢操作確不需要,那麼這兩個佇列分開接收訊息就比較好。
回到頂部5、主題模式
上面的路由模式是根據路由key進行完整的匹配(完全相等才傳送訊息),這裡的萬用字元模式通俗的來講就是模糊匹配。
符號“#”表示匹配一個或多個詞,符號“*”表示匹配一個詞。
①、生產者
View Code②、消費者
消費者1:
View Code消費2:
View Code③、分析結果
生產者傳送訊息繫結的路由key為update.Name;消費者1監聽的佇列和交換器繫結路由key為update.#;消費者2監聽的佇列和交換器繫結路由key為select.#。
很顯然,消費者1會接收到訊息,而消費者2接收不到。
回到頂部6、四種交換器
前面五種佇列模式介紹完了,但是實際上只有三種,第一種簡單佇列,第二種工作模式,剩下的三種都是和交換器繫結的合起來稱為一種,這小節我們就來詳細介紹交換器。
交換器分為四種,分別是:direct、fanout、topic和 headers。
前面三種分別對應路由模式、釋出訂閱模式和萬用字元模式,headers 交換器允許匹配 AMQP 訊息的 header 而非路由鍵,除此之外,header 交換器和 direct 交換器完全一致,但是效能卻差很多,因此基本上不會用到該交換器,這裡也不詳細介紹。
①、direct
如果路由鍵完全匹配的話,訊息才會被投放到相應的佇列。
②、fanout
當傳送一條訊息到fanout交換器上時,它會把訊息投放到所有附加在此交換器上的佇列。
③、topic
設定模糊的繫結方式,“*”操作符將“.”視為分隔符,匹配單個字元;“#”操作符沒有分塊的概念,它將任意“.”均視為關鍵字的匹配部分,能夠匹配多個字元。
回到頂部
7、總結
關於 RabbitMQ 的五種佇列,其實實際使用最多的是最後一種主題模式,通過模糊匹配,使得操作更加自如。那麼我們總結一下有交換器參與的佇列(最後三種佇列)工作方式如下: