1. 程式人生 > 其它 >RabbitMQ六種工作模式的對比與實踐

RabbitMQ六種工作模式的對比與實踐

目錄


  上一篇部落格我們介紹了RabbitMQ訊息通訊中的一些基本概念,這篇部落格我們介紹 RabbitMQ 的五種工作模式,這也是實際使用RabbitMQ需要重點關注的。

  這裡是RabbitMQ 官網中的相關介紹:http://www.rabbitmq.com/getstarted.html

  本篇部落格原始碼下載地址:https://github.com/YSOcean/RabbitMQTest

回到頂部

1、簡單佇列

  其實上篇文章末尾給出的程式碼就是簡單佇列。

  

  一個生產者對應一個消費者!!!

生產者將訊息傳送到“hello”佇列。消費者從該佇列接收訊息。

  ①、pom檔案

  必須匯入rabbitmq 依賴包

1     <dependency>
2       <groupId>com.rabbitmq</groupId>
3       <artifactId>amqp-client</artifactId>
4       <version>3.4.1</version>
5     </dependency>

  ②、工具類

 1 package com.ys.utils;
 2 
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5 
 6 /**
 7  * Create by hadoop
 8  */
 9 public class ConnectionUtil {
10 
11     public static Connection getConnection(String host,int port,String vHost,String userName,String passWord) throws Exception{
12         //1、定義連線工廠
13         ConnectionFactory factory = new ConnectionFactory();
14         //2、設定伺服器地址
15         factory.setHost(host);
16         //3、設定埠
17         factory.setPort(port);
18         //4、設定虛擬主機、使用者名稱、密碼
19         factory.setVirtualHost(vHost);
20         factory.setUsername(userName);
21         factory.setPassword(passWord);
22         //5、通過連線工廠獲取連線
23         Connection connection = factory.newConnection();
24         return connection;
25     }
26 }

  ③、生產者 Producer

 1 package com.ys.simple;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.ys.utils.ConnectionUtil;
 6 
 7 /**
 8  * Create by YSOcean
 9  */
10 public class Producer {
11     private final static String QUEUE_NAME = "hello";
12 
13     public static void main(String[] args) throws Exception{
14         //1、獲取連線
15         Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
16         //2、宣告通道
17         Channel channel = connection.createChannel();
18         //3、宣告(建立)佇列
19         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
20         //4、定義訊息內容
21         String message = "hello rabbitmq ";
22         //5、釋出訊息
23         channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
24         System.out.println("[x] Sent'"+message+"'");
25         //6、關閉通道
26         channel.close();
27         //7、關閉連線
28         connection.close();
29     }
30 }

  ④、消費者Consumer

 1 package com.ys.simple;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.QueueingConsumer;
 6 import com.ys.utils.ConnectionUtil;
 7 
 8 
 9 /**
10  * Create by YSOcean
11  */
12 public class Consumer {
13 
14     private final static String QUEUE_NAME = "hello";
15 
16     public static void main(String[] args) throws Exception{
17         //1、獲取連線
18         Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
19         //2、宣告通道
20         Channel channel = connection.createChannel();
21         //3、宣告佇列
22         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
23         //4、定義佇列的消費者
24         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
25         //5、監聽佇列
26         /*
27             true:表示自動確認,只要訊息從佇列中獲取,無論消費者獲取到訊息後是否成功消費,都會認為訊息已經成功消費
28             false:表示手動確認,消費者獲取訊息後,伺服器會將該訊息標記為不可用狀態,等待消費者的反饋,
29                    如果消費者一直沒有反饋,那麼該訊息將一直處於不可用狀態,並且伺服器會認為該消費者已經掛掉,不會再給其
30                    傳送訊息,直到該消費者反饋。
31          */
32 
33         channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
34         //6、獲取訊息
35         while (true){
36             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
37             String message = new String(delivery.getBody());
38             System.out.println(" [x] Received '" + message + "'");
39         }
40     }
41 
42 }

  注意這裡消費者有自動確認訊息和手動確認訊息兩種模式。

回到頂部

2、work 模式

  

  一個生產者對應多個消費者,但是隻能有一個消費者獲得訊息!!!

  競爭消費者模式。

  ①、生產者

View Code

  ②、消費者

  這裡建立兩個消費者

  消費者1:每接收一條訊息後休眠10毫秒

View Code

  消費者2:每接收一條訊息後休眠1000毫秒

View Code

  ③、測試結果

  首先生產者一次列印從0-9條訊息

  

  接著我們看消費者1:結果為列印偶數條訊息

  

  消費者2:結果為列印奇數條訊息

  

  ④、分析結果

  消費者1和消費者2獲取到的訊息內容是不同的,也就是說同一個訊息只能被一個消費者獲取。

  消費者1和消費者2分別獲取奇數條訊息和偶數條訊息,兩種獲取訊息的條數是一樣的。

  前面我們說這種模式是競爭消費者模式,一條佇列被多個消費者監聽,這裡兩個消費者,其中消費者1和消費者2在獲取訊息後分別休眠了10毫秒和1000毫秒,也就是說兩個消費者獲取訊息的效率是不一樣的,但是結果卻是兩者獲得的訊息條數是一樣的,這根本就不構成競爭關係,那麼我們應該怎麼辦才能讓工作效率高的消費者獲取訊息更多,也就是消費者1獲取訊息更多呢?

  PS:在增加一個消費者其實獲取訊息條數也是一樣的,消費者1獲取0,3,6,9,消費者2獲取1,4,7,消費者3獲取2,5,8

  ⑤、能者多勞

channel.basicQos(1);

  增加如上程式碼,表示同一時刻伺服器只會傳送一條訊息給消費者。消費者1和消費者2獲取訊息結果如下:

  

  

  ⑥、應用場景

  效率高的消費者消費訊息多。可以用來進行負載均衡。

回到頂部

3、釋出/訂閱模式

   

  一個消費者將訊息首先發送到交換器,交換器繫結到多個佇列,然後被監聽該佇列的消費者所接收並消費。

  ps:X表示交換器,在RabbitMQ中,交換器主要有四種類型:direct、fanout、topic、headers,這裡的交換器是 fanout。下面我們會詳細介紹這幾種交換器。

  ①、生產者

 1 package com.ys.ps;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.ys.utils.ConnectionUtil;
 6 
 7 /**
 8  * Create by YSOcean
 9  */
10 public class Producer {
11     private final static String EXCHANGE_NAME = "fanout_exchange";
12 
13     public static void main(String[] args) throws Exception {
14         //1、獲取連線
15         Connection connection = ConnectionUtil.getConnection("192.168.146.251", 5672, "/", "guest", "guest");
16         //2、宣告通道
17         Channel channel = connection.createChannel();
18         //3、宣告交換器
19         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
20         //4、建立訊息
21         String message = "hello rabbitmq";
22         //5、釋出訊息
23         channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
24         System.out.println("[x] Sent'" + message + "'");
25         //6、關閉通道
26         channel.close();
27         //7、關閉連線
28         connection.close();
29     }
30 }

  ②、消費者

  消費者1:

View Code

  消費者2:

View Code

  注意:消費者1和消費者2兩者監聽的佇列名稱是不一樣的,我們可以通過前臺管理系統看到:

  

  ③、測試結果

  

  

  

  消費1和消費者2都消費了該訊息。

  ps:這是因為消費者1和消費者2都監聽了被同一個交換器繫結的佇列。如果訊息傳送到沒有佇列繫結的交換器時,訊息將丟失,因為交換器沒有儲存訊息的能力,訊息只能儲存在佇列中。

  ④、應用場景

  比如一個商城系統需要在管理員上傳商品新的圖片時,前臺系統必須更新圖片,日誌系統必須記錄相應的日誌,那麼就可以將兩個佇列繫結到圖片上傳交換器上,一個用於前臺系統更新圖片,另一個用於日誌系統記錄日誌。

回到頂部

4、路由模式

  

  生產者將訊息傳送到direct交換器,在繫結佇列和交換器的時候有一個路由key,生產者傳送的訊息會指定一個路由key,那麼訊息只會傳送到相應key相同的佇列,接著監聽該佇列的消費者消費訊息。

  也就是讓消費者有選擇性的接收訊息。

  ①、生產者

View Code

  ②、消費者

  消費者1:

View Code

  消費者2:

View Code

  ③、測試結果

  我們首先看程式碼,生產者釋出訊息,指定的路由key為update。消費者1繫結佇列和交換機時key分別是update/delete/add;消費者2繫結佇列和交換器時key是select。

  所以我們可以猜測生產者傳送的訊息,只有消費者1能夠接收並消費,而消費者2是不能接收的。

  

  

  

  ④、應用場景

  利用消費者能夠有選擇性的接收訊息的特性,比如我們商城系統的後臺管理系統對於商品進行修改、刪除、新增操作都需要更新前臺系統的介面展示,而查詢操作確不需要,那麼這兩個佇列分開接收訊息就比較好。

回到頂部

5、主題模式

  

  上面的路由模式是根據路由key進行完整的匹配(完全相等才傳送訊息),這裡的萬用字元模式通俗的來講就是模糊匹配。

  符號“#”表示匹配一個或多個詞,符號“*”表示匹配一個詞。

  ①、生產者

View Code

  ②、消費者

  消費者1:

View Code

  消費2:

View Code

  ③、分析結果

  生產者傳送訊息繫結的路由key為update.Name;消費者1監聽的佇列和交換器繫結路由key為update.#;消費者2監聽的佇列和交換器繫結路由key為select.#。

  很顯然,消費者1會接收到訊息,而消費者2接收不到。

回到頂部

6、四種交換器

  前面五種佇列模式介紹完了,但是實際上只有三種,第一種簡單佇列,第二種工作模式,剩下的三種都是和交換器繫結的合起來稱為一種,這小節我們就來詳細介紹交換器。

  交換器分為四種,分別是:direct、fanout、topic和 headers。

  前面三種分別對應路由模式、釋出訂閱模式和萬用字元模式,headers 交換器允許匹配 AMQP 訊息的 header 而非路由鍵,除此之外,header 交換器和 direct 交換器完全一致,但是效能卻差很多,因此基本上不會用到該交換器,這裡也不詳細介紹。

  ①、direct

  如果路由鍵完全匹配的話,訊息才會被投放到相應的佇列。

  

  ②、fanout

  當傳送一條訊息到fanout交換器上時,它會把訊息投放到所有附加在此交換器上的佇列。

  

  ③、topic

  設定模糊的繫結方式,“*”操作符將“.”視為分隔符,匹配單個字元;“#”操作符沒有分塊的概念,它將任意“.”均視為關鍵字的匹配部分,能夠匹配多個字元。

  

回到頂部

7、總結

  關於 RabbitMQ 的五種佇列,其實實際使用最多的是最後一種主題模式,通過模糊匹配,使得操作更加自如。那麼我們總結一下有交換器參與的佇列(最後三種佇列)工作方式如下: