1. 程式人生 > >深入剖析 RabbitMQ —— Spring 框架下實現 AMQP 高階訊息佇列協議

深入剖析 RabbitMQ —— Spring 框架下實現 AMQP 高階訊息佇列協議

前言

訊息佇列在現今資料量超大,併發量超高的系統中是十分常用的。本文將會對現時最常用到的幾款訊息佇列框架 ActiveMQ、RabbitMQ、Kafka 進行分析對比。
詳細介紹 RabbitMQ 在 Sprinig 框架下的結構及實現原理,從Producer 端的事務、回撥函式(ConfirmCallback / ReturnCallback)到 Consumer 端的 MessageListenerContainer 資訊接收容器進行詳細的分析。通過對 RabbitTemplate、SimpleMessageListenerContainer、DirectMessageListenerContainer 等常用型別介紹,深入剖析在訊息處理各個傳輸環節中的原理及注意事項。

並舉以例項對死信佇列、持久化操作進行一一介紹。

 

目錄

一、RabbitMQ 與 AMQP 的關係

二、RabbitMQ 的實現原理

三、RabbitMQ 應用例項

四、Producer 端的訊息傳送與監控

五、Consumer 端的訊息接收與監控

六、死信佇列

七、持久化操作

 

 

 

一、RabbitMQ 與 AMQP 的關係

1.1 AMQP簡介

AMQP(Advanced Message Queue Protocol 高階訊息佇列協議)是一個訊息佇列協議,它支援符合條件的客戶端和訊息代理中介軟體(message middleware broker)進行通訊。RabbitMQ 則是 AMQP 協議的實現者,主要用於在分散式系統中資訊的儲存傳送與接收,RabbitMQ 的伺服器端用 Erlang 語言編寫,客戶端支援多種開發語言:Python、.NET、Java、Ruby、C、PHP、ActionScript、XMPP、STOMP 等。

1.2   ActiveMQ、RabbitMQ、Kafka 對比

現在在市場上有 ActiveMQ、RabbitMQ、Kafka 等多個常用的訊息佇列框架,與其他框架對比起來,RabbitMQ 在易用性、擴充套件性、高可用性、多協議、支援多語言客戶端等方面都有不俗表現。

 

1.2.1 AcitveMQ 特點

ActiveMQ 是 Apache 以 Java 語言開發的訊息模型,它完美地支援 JMS(Java Message Service)訊息服務,客戶端支援 Java、C、C++、C#、Ruby、Perl、Python、PHP 等多種開主發語言,支援OpenWire、Stomp、REST、XMPP、AMQP 等多種協議。ActiveMQ 採用非同步訊息傳遞方式,在設計上保證了多主機叢集,客戶端-伺服器,點對點等模式的有效通訊。從開始它就是按照 JMS 1.1 和 J2EE 1.4 規範進行開發,實現了訊息持久化,XA,事務支撐等功能。經歷多年的升級完善,現今已成為 Java 應用開發中主流的訊息解決方案。但相比起 RabbitMQ、Kafka 它的主要缺點表現為資源消耗比較大,吞吐量較低,在高併發的情況下系統支撐能力較弱。如果系統全程使用 Java 開發,其併發量在可控範圍內,或系統需要支援多種不同的協議,使用 ActiveMQ 可更輕便地搭建起訊息佇列服務。

1.2.2 Kafka 特點

Kafka 天生是面向分散式系統開發的訊息佇列,它具有高效能、容災性、可動態擴容等特點。Kafka 與生俱來的特點在於它會把每個Partition 的資料都備份到不同的伺服器當中,並與 ZooKeeper 配合,當某個Broker 故障失效時,ZooKeeper 服務就會將通知生產者和消費者,從備份伺服器進行資料恢復。在效能上 Kafka 也大大超越了傳統的 ActiveMQ、RabbitMQ ,由於 Kafka 叢集可支援動態擴容,在負載量到達峰值時可動態增加新的伺服器進叢集而無需重啟服務。但由於 Kafka 屬於分散式系統,所以它只能在同一分割槽內實現訊息有序,無法實現全域性訊息有序。而且它內部的監控機制不夠完善,需要安裝外掛,依賴ZooKeeper 進行元資料管理。如果系統屬於分散式管理機制,資料量較大且併發量難以預估的情況下,建議使用 Kafka 佇列。

1.2.3 RabbitMQ 對比

由於 ActiveMQ 過於依賴 JMS 的規範而限制了它的發展,所以 RabbitMQ 在效能和吞吐量上明顯會優於 ActiveMQ。
由於上市時間較長,在可用性、穩定性、可靠性上 RabbitMq 會比 Kafka 技術成熟,而且 RabbitMq 使用 Erlang 開發,所以天生具備高併發高可用的特點。而 Kafka 屬於分散式系統,它的效能、吞吐量、TPS 都會比 RabbitMq 要強。

回到目錄

二、RabbitMQ 的實現原理

2.1 生產者(Producer)、消費者(Consumer)、服務中心(Broker)之間的關係

首先簡單介紹 RabbitMQ 的執行原理,在 RabbitMQ 使用時,系統會先安裝並啟動 Broker Server,也就是 RabbitMQ 的服務中心。無論是生產者 (Producer),消費者(Consumer)都會通過連線池(Connection)使用 TCP/IP 協議(預設)來與 BrokerServer 進行連線。然後 Producer 會把 Exchange / Queue 的繫結資訊傳送到 Broker Server,Broker Server 根據 Exchange 的型別邏輯選擇對應 Queue ,最後把資訊傳送到與 Queue 關聯的對應 Consumer 。

 

2.2 交換器(Exchange)、佇列(Queue)、通道(Channel)、繫結(Binding)的概念

2.2.1 交換器 Exchange

Producer 建立連線後,並非直接將訊息投遞到佇列 Queue 中,而是把訊息傳送到交換器 Exchange,由 Exchange 根據不同邏輯把訊息傳送到一個或多個對應的隊列當中。目前 Exchange 提供了四種不同的常用型別:Fanout、Direct、Topic、Header。

  • Fanout型別

此型別是最為常見的交換器,它會將訊息轉發給所有與之繫結的佇列上。比如,有N個佇列與 Fanout 交換器繫結,當產生一條訊息時,Exchange 會將該訊息的N個副本分別發給每個佇列,類似於廣播機制。

  • Direct型別

此型別的 Exchange 會把訊息傳送到 Routing_Key 完全相等的隊列當中。多個 Cousumer 可以使用相同的關鍵字進行繫結,類似於資料庫的一對多關係。比如,Producer 以 Direct 型別的 Exchange 推送 Routing_Key 為 direct.key1 的佇列,系統再指定多個 Cousumer 繫結 direct.key1。如此,訊息就會被分發至多個不同的 Cousumer 當中。

  • Topic型別

此型別是最靈活的一種方式配置方式,它可以使用模糊匹配,根據 Routing_Key 繫結到包含該關鍵字的不同佇列中。比如,Producer 使用 Topic型別的 Exchange 分別推送 Routing_Key 設定為 topic.guangdong.guangzhou 、topic.guangdong.shenzhen 的不同佇列,Cousumer 只需要把 Routing_Key 設定為 topic.guangdong.# ,就可以把所有訊息接收處理。

  • Headers型別

該型別的交換器與前面介紹的稍有不同,它不再是基於關鍵字 Routing_Key 進行路由,而是基於多個屬性進行路由的,這些屬性比路由關鍵字更容易表示為訊息的頭。也就是說,用於路由的屬性是取自於訊息 Header 屬性,當訊息 Header 的值與佇列繫結時指定的值相同時,訊息就會路由至相應的佇列中。

2.2.2 Queue 佇列

Queue 佇列是訊息的載體,每個訊息都會被投入到 Queue 當中,它包含 name,durable,arguments 等多個屬性,name 用於定義它的名稱,當 durable(持久化)為 true 時,佇列將會持久化儲存到硬碟上。反之為 false 時,一旦 Broker Server 被重啟,對應的佇列就會消失,後面還會有例子作詳細介紹。

2.2.3 Channel 通道

當 Broker Server 使用 Connection 連線 Producer / Cousumer 時會使用到通道(Channel),一個 Connection上可以建立多個 Channel,每個 Channel 都有一個會話任務,可以理解為邏輯上的連線。主要用作管理相關的引數定義,傳送訊息,獲取訊息,事務處理等。

2.2.4 Binding 繫結

Binding 主要用於繫結交換器 Exchange 與 佇列 Queue 之間的對應關係,並記錄路由的 Routing-Key。Binding 資訊會儲存到系統當中,用於 Broker Server 資訊的分發依據。

回到目錄

三、RabbitMQ 應用例項

3.1 Rabbit 常用類說明

3.1.1 RabbitTemplate 類

Spring 框架已經封裝了 RabbitTemplate  對 RabbitMQ 的繫結、佇列傳送、接收進行簡化管理

方法 說明
void setExchange(String exchange)   設定繫結的 exchange 名稱
String getExchange() 獲取已繫結的 exchange 名稱
void setRoutingKey(String routingKey) 設定繫結的 routingKey
String getRoutingKey() 獲取已繫結的 routingKey
void send(String exchange, String routingKey, Message message,CorrelationData data) 以Message方式傳送資訊到 Broken Server,CorrelationData 為標示符可為空
void convertAndSend(String exchange, String routingKey, Object object, CorrelationData data) 以自定義物件方式傳送資訊到 Broken Server,系統將自動把 object轉換成 Message,CorrelationData 為標示符可為空
Message receive(String queueName, long timeoutMillis) 根據queueuName接收佇列傳送Message資訊
Object receiveAndConvert(String queueName, long timeoutMillis) 根據queueuName接收佇列物件資訊
void setReceiveTimeout(long receiveTimeout) 設定接收過期時間
void setReplyTimeout(long replyTimeout) 設定重發時間
void setMandatory(boolean mandatory) 開啟強制委託模式(下文會詳細說明)
void setConfirmCallback(confirmCallback) 繫結訊息確認回撥方法(下文會詳細說明)
void setReturnCallback(returnCallback) 繫結訊息退出回撥方法(下文會詳細說明)


3.2  初探 RabbitMQ 

在官網下載併成功安裝完 RabbitMQ 後,開啟預設路徑 http://localhost:15672/#/ 即可看到 RabbitMQ 服務中心的管理介面

 

3.2.1 Producer 端開發

先在 pom 中新增 RabbitMQ 的依賴,並在 application.yml 中加入 RabbitMQ 帳號密碼等資訊。此例子,我們嘗試使用 Direct 交換器把佇列傳送到不同的 Consumer。

 1 **********************pom *************************
 2 <project>
 3         .............
 4     <dependency>
 5         <groupId>org.springframework.boot</groupId>
 6         <artifactId>spring-boot-starter-amqp</artifactId>
 7         <version>2.0.5.RELEASE</version>
 8     </dependency>
 9 </project>
10 
11 ****************  application.yml  ****************
12 spring:
13   application:
14      name: rabbitMqProducer
15   rabbitmq:
16     host: localhost 
17     port: 5672
18     username: admin
19     password: 12345678
20     virtual-host: /LeslieHost

首先使用 CachingConnectionFactory 建立連結,通過 BindingBuilder 繫結 Exchange、Queue、RoutingKey之間的關係。
然後通過 void convertAndSend (String exchange, String routingKey, Object object, CorrelationData data) 方法把資訊傳送到 Broken Server

 1 @Configuration
 2 public class ConnectionConfig {
 3     @Value("${spring.rabbitmq.host}")
 4     public String host;
 5     
 6     @Value("${spring.rabbitmq.port}")
 7     public int port;
 8     
 9     @Value("${spring.rabbitmq.username}")
10     public String username;
11     
12     @Value("${spring.rabbitmq.password}")
13     public String password;
14     
15     @Value("${spring.rabbitmq.virtual-host}")
16     public String virtualHost;
17 
18     @Bean
19     public ConnectionFactory getConnectionFactory(){
20         CachingConnectionFactory factory=new CachingConnectionFactory();
21         factory.setHost(host);
22         factory.setPort(port);
23         factory.setUsername(username);
24         factory.setPassword(password);
25         factory.setVirtualHost(virtualHost);
26         return factory;
27     }
28 }
29 
30 @Configuration
31 public class BindingConfig {
32     public final static String first="direct.first";
33     public final static String second="direct.second";
34     public final static String Exchange_NAME="directExchange";
35     public final static String RoutingKey1="directKey1";
36     public final static String RoutingKey2="directKey2";
37     
38     @Bean
39     public Queue queueFirst(){
40         return new Queue(first);
41     }
42     
43     @Bean
44     public Queue queueSecond(){
45         return new Queue(second);
46     }
47     
48     @Bean
49     public DirectExchange directExchange(){
50         return new DirectExchange(Exchange_NAME,true,true);
51     }
52     
53     //利用BindingBuilder繫結Direct與queueFirst
54     @Bean
55     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
56         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
57     }
58     
59     //利用BindingBuilder繫結Direct與queueSecond
60     @Bean
61     public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
62         return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
63     }   
64 }
65 
66 @Controller
67 @RequestMapping("/producer")
68 public class ProducerController {
69     @Autowired
70     private RabbitTemplate template;
71     
72     @RequestMapping("/send")
73     public void send() {
74         for(int n=0;n<100;n++){   
75 
76             template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"I'm the first queue!   "+String.valueOf(n),getCorrelationData());
77             template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey2,"I'm the second queue!  "+String.valueOf(n),getCorrelationData());
78         }
79     }
80 
81      private CorrelationData getCorrelationData(){
82         return new CorrelationData(UUID.randomUUID().toString());
83     }
84 }

此時,開啟 RabbitMQ 管理介面,可看到 Producer 已經向 Broken Server 的 direct.first / direct.second 兩個 Queue 分別傳送100 個 Message

3.2.2  Consumer 端開發

分別建立兩個不同的 Consumer ,一個繫結 direct.first 別一個繫結 direct.second , 然後通過註解 @RabbitListener 監聽不同的 queue,當接到到 Producer 推送佇列時,顯示佇列資訊。

 1 @Configuration
 2 public class ConnectionConfig {
 3     @Value("${spring.rabbitmq.host}")
 4     public String host;
 5     
 6     @Value("${spring.rabbitmq.port}")
 7     public int port;
 8     
 9     @Value("${spring.rabbitmq.username}")
10     public String username;
11     
12     @Value("${spring.rabbitmq.password}")
13     public String password;
14     
15     @Value("${spring.rabbitmq.virtual-host}")
16     public String virtualHost;
17 
18     @Bean
19     public ConnectionFactory getConnectionFactory(){
20         CachingConnectionFactory factory=new CachingConnectionFactory();
21         factory.setHost(host);
22         factory.setPort(port);
23         factory.setUsername(username);
24         factory.setPassword(password);
25         factory.setVirtualHost(virtualHost);
26         return factory;
27     }
28 }
29 
30 @Configuration
31 public class BindingConfig {
32     public final static String first="direct.first";
33     public final static String Exchange_NAME="directExchange";
34     public final static String RoutingKey1="directKey1";
35     
36     @Bean
37     public Queue queueFirst(){
38         return new Queue(first);
39     }
40     
41     @Bean
42     public DirectExchange directExchange(){
43         return new DirectExchange(Exchange_NAME);
44     }
45     
46     //利用BindingBuilder繫結Direct與queueFirst
47     @Bean
48     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
49         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
50     }  
51 }
52 
53 @Configuration
54 @RabbitListener(queues="direct.first")
55 public class RabbitMqListener {
56     
57     @RabbitHandler
58     public void handler(String message){
59         System.out.println(message);
60     }
61 }
62 
63 @SpringBootApplication
64 public class App {
65     
66     public static void main(String[] args){
67         SpringApplication.run(App.class, args);
68     }
69 }

執行後可以觀察到不同的 Consumer 會收到不同佇列的訊息

如果覺得使用 Binding 程式碼繫結過於繁瑣,還可以直接在監聽類RabbitMqListener中使用 @QueueBinding 註解繫結

 1 @Configuration
 2 public class ConnectionConfig {
 3     @Value("${spring.rabbitmq.host}")
 4     public String host;
 5     
 6     @Value("${spring.rabbitmq.port}")
 7     public int port;
 8     
 9     @Value("${spring.rabbitmq.username}")
10     public String username;
11     
12     @Value("${spring.rabbitmq.password}")
13     public String password;
14     
15     @Value("${spring.rabbitmq.virtual-host}")
16     public String virtualHost;
17 
18     @Bean
19     public ConnectionFactory getConnectionFactory(){
20         CachingConnectionFactory factory=new CachingConnectionFactory();
21         factory.setHost(host);
22         factory.setPort(port);
23         factory.setUsername(username);
24         factory.setPassword(password);
25         factory.setVirtualHost(virtualHost);
26         return factory;
27     }
28 }
29 
30 @Configuration
31 @RabbitListener(bindings=@QueueBinding(
32 exchange=@Exchange(value="directExchange"),
33 value=@Queue(value="direct.second"),
34 key="directKey2"))
35 public class RabbitMqListener {
36     
37     @RabbitHandler
38     public void handler(String message){
39         System.out.println(message);
40     }
41 }
42 
43 @SpringBootApplication
44 public class App {
45     
46     public static void main(String[] args){
47         SpringApplication.run(App.class, args);
48     }
49 }

執行結果

 

回到目錄

四、Producer 端的訊息傳送與監控

前面一節已經介紹了RabbitMQ的基本使用方法,這一節將從更深入的層面講述 Producer 的應用。
試想一下這種的情形,如果因 RabbitTemplate 傳送時 Exchange 名稱繫結錯誤,或 Broken Server 因網路問題或服務負荷過大引發異常,Producer 傳送的佇列丟失,系統無法正常工作。此時,開發人員應該進行一系列應對措施進行監測,確保每個資料都能正常推送到 Broken Server 。有見及此,RabbitMQ 專門為大家提供了兩種解決方案,一是使用傳統的事務模式,二是使用回撥函式,下面為大家作詳介紹。

4.1 Producer 端的事務管理

在需要使用事務時,可以通過兩種方法
第一可以呼叫 channel 類的方法以傳統模式進行管理,事務開始時呼叫 channel.txSelect(),資訊傳送後進行確認 channel.txCommit(),一旦捕捉到異常進行回滾 channel.txRollback(),最後關閉事務。

 1 @Controller
 2 @RequestMapping("/producer")
 3 public class ProducerController {
 4     @Autowired
 5     private RabbitTemplate template;
 6  
 7     @RequestMapping("/send")
 8     public void send1(HttpServletResponse response) 
 9         throws InterruptedException, IOException,  TimeoutException{
10         Channel channel=template.getConnectionFactory().createConnection().createChannel(true);
11         .......
12         try{
13             channel.txSelect();
14             channel.basicPublish("ErrorExchange", BindingConfig.Routing_Key_First, new AMQP.BasicProperties(),"Nothing".getBytes());
15             channel.txCommit();
16         }catch(Exception e){
17             channel.txRollback();
18         }finally{
19             channel.close();
20         }
21         ......
22         ......
23         ......
24     }
25 }

第二還可以直接通過 RabbitTemplate 的配置方法 void setChannelTransacted(bool isTransacted) 直接開啟事務

 1 public class ProducerController {
 2     @Autowired
 3     private ConnectionConfig connection;
 4 
 5     @Autowired
 6     @Bean
 7     private RabbitTemplate template(){
 8         RabbitTemplate template=new RabbitTemplate(connection.getConnectionFactory());
 9         template.setChannelTransacted(true);
10         return template;
11     }
12  
13     @RequestMapping("/send")
14     @Transactional(rollbackFor=Exception.class)
15     public void send(HttpServletResponse response) throws InterruptedException, IOException,TimeoutException{
16         ..........
17         ..........
18         ..........
19     }
20 }

 

4.2 利用 ConfirmCallback 回撥確認訊息是否成功傳送到 Exchange 

使用事務模式消耗的系統資源比較大,系統往往會處理長期等待的狀態,在併發量較高的時候也有可能造成死鎖的隱患。有見及此,系統提供了輕量級的回撥函式方式進行非同步處理。
當需要確認訊息是否成功傳送到 Exchange 的時候,可以使用 ConfirmCallback 回撥函式。使用該函式,系統推送訊息後,該執行緒便會得到釋放,等 Exchange 接收到訊息後系統便會非同步呼叫 ConfirmCallback 繫結的方法進行處理。ConfirmCallback 只包含一個方法 void confirm(CorrelationData correlationData, boolean ack, String cause),此方法會把每條資料傳送到 Exchange 時候的 ack 狀態(成功/失敗),cause 成敗原因,及對應的 correlationData(CorrelationData 只包含一個屬性 id,是繫結傳送物件的唯一識別符號) 返還到 Producer,讓Producer 進行相應處理。

注意:在繫結 ConfirmCallback 回撥函式前,請先把  publisher-confirms 屬性設定為 true

 1 spring:
 2   application:
 3      name: rabbitmqproducer
 4   rabbitmq:
 5     host: 127.0.0.1 
 6     port: 5672
 7     username: admin
 8     password: 12345678
 9     virtual-host: /LeslieHost

例如:下面的例子,特意將 RabbitTemplate 傳送時所繫結的 Exchange 名稱填寫為錯誤名稱 “ ErrorExchange ”,造成傳送失敗,然後在回撥函式中檢查失敗的原因。

Producer 端程式碼: 

 1 @Configuration
 2 public class ConnectionConfig {
 3     @Value("${spring.rabbitmq.host}")
 4     public String host;
 5     
 6     @Value("${spring.rabbitmq.port}")
 7     public int port;
 8     
 9     @Value("${spring.rabbitmq.username}")
10     public String username;
11     
12     @Value("${spring.rabbitmq.password}")
13     public String password;
14     
15     @Value("${spring.rabbitmq.virtual-host}")
16     public String virtualHost;
17 
18     @Bean
19     public ConnectionFactory getConnectionFactory(){
20         CachingConnectionFactory factory=new CachingConnectionFactory();
21         System.out.println(host);
22         factory.setHost(host);
23         factory.setPort(port);
24         factory.setUsername(username);
25         factory.setPassword(password);
26         factory.setVirtualHost(virtualHost);
27         factory.setPublisherConfirms(true);
28         factory.setPublisherReturns(true);
29         return factory;
30     }
31 }
32 
33 @Configuration
34 public class BindingConfig {
35     public final static String first="direct.first";
36     public final static String Exchange_NAME="directExchange";
37     public final static String RoutingKey1="directKey1";
38     
39     @Bean
40     public Queue queueFirst(){
41         return new Queue(first);
42     }
43 
44     @Bean
45     public DirectExchange directExchange(){
46         return new DirectExchange(Exchange_NAME);
47     }
48     
49     @Bean
50     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
51         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
52     }  
53 }
54 
55 @Component
56 public class MyConfirmCallback implements ConfirmCallback {
57     
58     @Override
59     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
60         // TODO 自動生成的方法存根
61         // TODO 自動生成的方法存根
62         if(ack){
63             System.out.println(correlationData.getId()+" ack is: true! \ncause:"+cause);
64         }else
65             System.out.println(correlationData.getId()+" ack is: false! \ncause:"+cause);
66     }
67 }
68 
69 @Controller
70 @RequestMapping("/producer")
71 public class ProducerController {
72     @Autowired
73     private RabbitTemplate template;
74     @Autowired
75     private MyConfirmCallback confirmCallback;
76 
77     @RequestMapping("/send")
78     public void send() {
79         template.setConfirmCallback(confirmCallback);       
80         for(int n=0;n<2;n++){   
81             template.convertAndSend("ErrorExchange",
82                      BindingConfig.RoutingKey1,"I'm the first queue!   "
83                      +String.valueOf(n),getCorrelationData());
84         }
85     }
86 
87      private CorrelationData getCorrelationData(){
88         return new CorrelationData(UUID.randomUUID().toString());
89     }
90 }    

Consumer端程式碼

 1 @Configuration
 2 public class ConnectionConfig {
 3     @Value("${spring.rabbitmq.host}")
 4     public String host;
 5     
 6     @Value("${spring.rabbitmq.port}")
 7     public int port;
 8     
 9     @Value("${spring.rabbitmq.username}")
10     public String username;
11     
12     @Value("${spring.rabbitmq.password}")
13     public String password;
14     
15     @Value("${spring.rabbitmq.virtual-host}")
16     public String virtualHost;
17 
18     @Bean
19     public ConnectionFactory getConnectionFactory(){
20         CachingConnectionFactory factory=new CachingConnectionFactory();
21         factory.setHost(host);
22         factory.setPort(port);
23         factory.setUsername(username);
24         factory.setPassword(password);
25         factory.setVirtualHost(virtualHost);
26         return factory;
27     }
28 }
29 
30 @Configuration
31 @RabbitListener(bindings=@QueueBinding(
32 exchange=@Exchange(value="directExchange"),
33 value=@Queue(value="direct.first"),
34 key="directKey1"))
35 public class RabbitMqListener {
36     
37     @RabbitHandler
38     public void handler(String message){
39         System.out.println(message);
40     }
41 }
42 
43 @SpringBootApplication
44 public class App {
45     
46     public static void main(String[] args){
47         SpringApplication.run(App.class, args);
48     }
49 }

執行結果:

 

4.3 繫結 CorrelationData 與傳送物件的關係

上面的例子當中,CorrelationData 只是用一個隨機的 UUID 作為 CorrelationID,而在現實的應用場景中,由於 ConfirmCallback 只反回標識值 CorrelationData,而沒有把佇列裡的物件值也一同返回。所以,在推送佇列時可以先用 Key-Value 儲存 CorrelationID 與所傳送資訊的關係,這樣當 ConfirmCallback 回撥時,就可根據 CorrelationID 找回物件,作進一步處理。
下面例子,我們把要傳送的物件放在虛擬資料 DataSource 類中,用 DataRelation 記錄 CorrelationID 與傳送物件 OrderID 的關係,然後在回撥函式 ConfirmCallback 中根據 CorrelationID 查詢對應的 OrderEntity,如果傳送成功,則刪除繫結。如果傳送失敗,可以重新發送或根據情況再作處理。

Producer端程式碼:

  1 @Configuration
  2 public class ConnectionConfig {
  3     @Value("${spring.rabbitmq.host}")
  4     public String host;
  5     
  6     @Value("${spring.rabbitmq.port}")
  7     public int port;
  8     
  9     @Value("${spring.rabbitmq.username}")
 10     public String username;
 11     
 12     @Value("${spring.rabbitmq.password}")
 13     public String password;
 14     
 15     @Value("${spring.rabbitmq.virtual-host}")
 16     public String virtualHost;
 17 
 18     @Bean
 19     public ConnectionFactory getConnectionFactory(){
 20         CachingConnectionFactory factory=new CachingConnectionFactory();
 21         System.out.println(host);
 22         factory.setHost(host);
 23         factory.setPort(port);
 24         factory.setUsername(username);
 25         factory.setPassword(password);
 26         factory.setVirtualHost(virtualHost);
 27         factory.setPublisherConfirms(true);
 28         factory.setPublisherReturns(true);
 29         return factory;
 30     }
 31 }
 32 
 33 @Configuration
 34 public class BindingConfig {
 35     public final static String first="direct.first";
 36     //Exchange 使用 direct 模式     
 37     public final static String Exchange_NAME="directExchange";
 38     public final static String RoutingKey1="directKey1";
 39     
 40     @Bean
 41     public Queue queueFirst(){
 42         return new Queue(first);
 43     }
 44     
 45     @Bean
 46     public DirectExchange directExchange(){
 47         return new DirectExchange(Exchange_NAME);
 48     }
 49     
 50     @Bean
 51     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 52         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
 53     }
 54 }
 55 
 56 @Data
 57 public class OrderEntity implements Serializable{
 58     private String id;
 59     private String goods;
 60     private Double price;
 61     private Integer count;
 62     
 63     public OrderEntity(String id,String goods,Double price,Integer count){
 64         this.id=id;
 65         this.goods=goods;
 66         this.price=price;
 67         this.count=count;
 68     }
 69     
 70     public OrderEntity(){}
 71     
 72     public String getId() {
 73         return id;
 74     }
 75     public void setId(String id) {
 76         this.id = id;
 77     }
 78 
 79     public String getGoods() {
 80         return goods;
 81     }
 82 
 83     public void setGoodsId(String goods) {
 84         this.goods = goods;
 85     }
 86 
 87     public Integer getCount() {
 88         return count;
 89     }
 90 
 91     public void setCount(Integer count) {
 92         this.count = count;
 93     }
 94 
 95     public Double getPrice() {
 96         return price;
 97     }
 98 
 99     public void setPrice(Double price) {
100         this.price = price;
101     }
102 }
103 
104 @Component
105 public class DataSource {
106     //加入虛擬資料
107     private static List<OrderEntity> list=new ArrayList<OrderEntity>(
108             Arrays.asList(new OrderEntity("001","Nikon D750",13990.00,1),
109                           new OrderEntity("002","Huwei P30 Plus",5400.00,1),
110                           ..........));
111     
112     public DataSource(){
113     }
114     
115     public List<OrderEntity> getOrderList(){
116         return list;
117     }
118     
119     //根據Id獲取對應order
120     public OrderEntity getOrder(String id){
121         for(OrderEntity order:list){
122             if(order.getId()==id)
123                 return order;
124         }
125         return null;
126     }
127 }
128 
129 public class DataRelation {
130     public static Map map=new HashMap();
131     
132     //繫結關係
133     public static void add(String key,String value){
134         if(!map.containsKey(key))
135             map.put(key,value);
136     }
137     
138     //返回orderId
139     public static Object get(String key){
140         if(map.containsKey(key))
141             return map.get(key);
142         else
143             return null;
144     }
145     
146     //根據 orderId 刪除繫結關係
147     public static void del(String key){
148         if(map.containsKey(key))
149            map.remove(key);
150     }
151 }
152 
153 @Component
154 public class MyConfirmCallback implements ConfirmCallback {
155     @Autowired
156     private DataSource datasource;
157     
158     @Override
159     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
160         String correlationId=correlationData.getId();
161         //根據 correclationId取回對應的orderId
162         String orderId=DataRelation.get(correlationId).toString();
163         //在datasource中找回對應的order
164         OrderEntity order=datasource.getOrder(orderId);
165         
166         if(ack){
167             System.out.println("--------------------ConfirmCallback-------------------\n"                 
168                 +" order's ack is true!\nId:"+order.getId()+"  Goods:"+order.getGoods()
169                 +" Count:"+order.getCount().toString()+"  Price:"+order.getPrice());
170             DataRelation.del(correlationId);    //操作完成刪除對應繫結
171         }else {
172             System.out.println(order.getId()+" order's ack is: false! \ncause:"+cause);
173             //可在記錄日誌後把Order推送到佇列進行重新發送
174             .......
175         }
176     }
177 }
178 
179 @Controller
180 @RequestMapping("/producer")
181 public class ProducerController {
182     @Autowired
183     private RabbitTemplate template;
184     @Autowired
185     private MyConfirmCallback confirmCallback;
186     @Autowired
187     private DataSource dataSource;
188 
189     @RequestMapping("/send")
190     public void send() throws InterruptedException, IOException{
191         //繫結 ConfirmCallback 回撥函式
192         template.setConfirmCallback(confirmCallback);
193  
194         for(OrderEntity order:dataSource.getOrderList()){
195             CorrelationData correlationData=getCorrelationData();
196             //儲存 CorrelationId 與 orderId關係
197             DataRelation.add(correlationData.getId(), order.getId());
198             //把 order 插入佇列
199             template.convertAndSend("directExchange",BindingConfig.RoutingKey1,order,correlationData);
200         }
201     }
202     
203     private CorrelationData getCorrelationData(){
204         return new CorrelationData(UUID.randomUUID().toString());
205     }
206 }

Consumer 端程式碼

 1 @Configuration
 2 public class ConnectionConfig {
 3     @Value("${spring.rabbitmq.host}")
 4     public String host;
 5     
 6     @Value("${spring.rabbitmq.port}")
 7     public int port;
 8     
 9     @Value("${spring.rabbitmq.username}")
10     public String username;
11     
12     @Value("${spring.rabbitmq.password}")
13     public String password;
14     
15     @Value("${spring.rabbitmq.virtual-host}")
16     public String virtualHost;
17 
18     @Bean
19     public ConnectionFactory getConnectionFactory(){
20         CachingConnectionFactory factory=new CachingConnectionFactory();
21         factory.setHost(host);
22         factory.setPort(port);
23         factory.setUsername(username);
24         factory.setPassword(password);
25         factory.setVirtualHost(virtualHost);
26         return factory;
27     }
28 }
29 
30 @Configuration
31 @RabbitListener(bindings=@QueueBinding(
32 exchange=@Exchange(value="directExchange"),
33 value=@Queue(value="direct.first"),
34 key="directKey1"))
35 public class RabbitMqListener {
36     
37     @RabbitHandler
38     public void handler(String message){
39         System.out.println(message);
40     }
41 }
42 
43 @SpringBootApplication
44 public class App {
45     
46     public static void main(String[] args){
47         SpringApplication.run(App.class, args);
48     }
49 }

執行結果

 

4.4 利用 ReturnCallback 處理佇列 Queue 錯誤

使用 ConfirmCallback 函式只能判斷訊息是否成功傳送到 Exchange,但並不能保證訊息已經成功進行佇列 Queue。所以,系統預備了另一個回撥函式 ReturnCallback 來監聽 Queue 佇列處理的成敗。如果佇列錯誤繫結不存在的 queue,或者 Broken Server 瞬間出現問題末能找到對應的 queue,系統就會激發 Producer 端 ReturnCallback 的回撥函式來進行錯誤處理。 ReturnCallback 回撥介面只包含一個方法 void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey),它會把出錯的 replyCode,replyText,exchange,routingKey等值都一起返還。與 ConfirmCallback 不同的是,returnedMessage 會把佇列中的物件儲存到 Message 的 Body 屬性中並返還到回撥函式。

注意:在繫結 ReturnCallback 回撥函式前,請先把  publisher-returns 及 mandatory 屬性設定為 true 。 mandatory 引數預設為 false,用於判斷 broken server是否把錯誤的物件返還到  Producer。如末進行設定,系統將把錯誤的訊息丟棄。

下面例子我們在呼叫 convertAndSend 方法時特意把 routingKey 設定為 ErrorKey,觸發 ReturnCallback 回撥,然後在 ReturenCallback 的回撥方法顯示 replyCode,replyText,exchange,routingKey 等值,並把佇列中物件屬性一併顯示。

Producer 端程式碼

  1 @Configuration
  2 public class ConnectionConfig {
  3     @Value("${spring.rabbitmq.host}")
  4     public String host;
  5     
  6     @Value("${spring.rabbitmq.port}")
  7     public int port;
  8     
  9     @Value("${spring.rabbitmq.username}")
 10     public String username;
 11     
 12     @Value("${spring.rabbitmq.password}")
 13     public String password;
 14     
 15     @Value("${spring.rabbitmq.virtual-host}")
 16     public String virtualHost;
 17 
 18     @Bean
 19     public ConnectionFactory getConnectionFactory(){
 20         CachingConnectionFactory factory=new CachingConnectionFactory();
 21         System.out.println(host);
 22         factory.setHost(host);
 23         factory.setPort(port);
 24         factory.setUsername(username);
 25         factory.setPassword(password);
 26         factory.setVirtualHost(virtualHost);
 27         factory.setPublisherConfirms(true);
 28         factory.setPublisherReturns(true);
 29         return factory;
 30     }
 31 }
 32 
 33 @Configuration
 34 public class BindingConfig {
 35     public final static String first="direct.first";
 36     public final static String Exchange_NAME="directExchange";
 37     public final static String RoutingKey1="directKey1";
 38     
 39     @Bean
 40     public Queue queueFirst(){
 41         return new Queue(first);
 42     }
 43 
 44     @Bean
 45     public DirectExchange directExchange(){
 46         return new DirectExchange(Exchange_NAME);
 47     }
 48     
 49     @Bean
 50     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 51         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
 52     } 
 53 }
 54 
 55 @Data
 56 public class OrderEntity implements Serializable{
 57     private String id;
 58     private String goods;
 59     private Double price;
 60     private Integer count;
 61     
 62     public OrderEntity(String id,String goods,Double price,Integer count){
 63         this.id=id;
 64         this.goods=goods;
 65         this.price=price;
 66         this.count=count;
 67     }
 68     
 69     public OrderEntity(){}
 70     
 71     public String getId() {
 72         return id;
 73     }
 74     public void setId(String id) {
 75         this.id = id;
 76     }
 77 
 78     public String getGoods() {
 79         return goods;
 80     }
 81 
 82     public void setGoodsId(String goods) {
 83         this.goods = goods;
 84     }
 85 
 86     public Integer getCount() {
 87         return count;
 88     }
 89 
 90     public void setCount(Integer count) {
 91         this.count = count;
 92     }
 93 
 94     public Double getPrice() {
 95         return price;
 96     }
 97 
 98     public void setPrice(Double price) {
 99         this.price = price;
100     }
101 }
102 
103 @Component
104 public class DataSource {
105     //虛擬資料
106     private static List<OrderEntity> list=new ArrayList<OrderEntity>(
107             Arrays.asList(new OrderEntity("001","Nikon D750",13990.00,1),
108                           new OrderEntity("002","Huwei P30 Plus",5400.00,1),
109                           ......));
110     public DataSource(){
111     }
112     
113     public List<OrderEntity> getOrderList(){
114         return list;
115     }
116     
117     //根據Id獲取對應order
118     public OrderEntity getOrder(String id){
119         for(OrderEntity order:list){
120             if(order.getId()==id)
121                 return order;
122         }
123         return null;
124     }
125 }
126 
127 @Component
128 public class MyReturnCallback implements ReturnCallback {
129 
130     @Override
131     public void returnedMessage(Message message, int replyCode, 
132             String replyText, String exchange, String routingKey){
133         //把messageBody反序列化為 OrderEntity物件
134         OrderEntity order=convertToOrder(message.getBody());
135         //顯示錯誤原因
136         System.out.println("-------------ReturnCallback!------------\n"
137             +" exchange:"+exchange+" replyCode:"+String.valueOf(replyCode)
138             +" replyText:"+replyText+" key:"+routingKey+"\n OrderId:"+order.getId()
139             +"  Goods:"+order.getGoods()+"  Count:"+order.getCount().toString()
140             +"  Price:"+order.getPrice()+" ");
141     }
142     
143     //把byte[]反序列化為 OrderEntity物件
144     private OrderEntity convertToOrder(byte[] bytes){
145         OrderEntity order=null;
146         ByteArrayInputStream bis = new ByteArrayInputStream (bytes);        
147         ObjectInputStream ois;
148         try {
149             ois = new ObjectInputStream (bis);
150             Object obj = ois.readObject();
151             order=(OrderEntity)obj;
152             ois.close();   
153             bis.close(); 
154         } catch (IOException | ClassNotFoundException e) {
155             // TODO 自動生成的 catch 塊
156             e.printStackTrace();
157         }        
158         return order;
159     }
160 }
161 
162 @Controller
163 @RequestMapping("/producer")
164 public class ProducerController {
165     @Autowired
166     private RabbitTemplate template;
167     @Autowired
168     private MyReturnCallback returnCallback;
169     @Autowired
170     private DataSource dataSource;
171  
172     
173     @RequestMapping("/send")
174     public void send() throws InterruptedException, IOException{
175         //把 mandatory 屬性設定為true
176         template.setMandatory(true);
177         //繫結 ReturnCallback 回撥函式
178         template.setReturnCallback(returnCallback);
179  
180         for(OrderEntity order:dataSource.getOrderList()){
181             CorrelationData correlationData=getCorrelationData();
182             template.convertAndSend("directExchange","ErrorKey",order,correlationData);
183         }
184     }
185     
186     private CorrelationData getCorrelationData(){
187         return new CorrelationData(UUID.randomUUID().toString());
188     }
189 }

Consumer 程式碼

 1 @Configuration
 2 public class ConnectionConfig {
 3     @Value("${spring.rabbitmq.host}")
 4     public String host;
 5     
 6     @Value("${spring.rabbitmq.port}")
 7     public int port;
 8     
 9     @Value("${spring.rabbitmq.username}")
10     public String username;
11     
12     @Value("${spring.rabbitmq.password}")
13     public String password;
14     
15     @Value("${spring.rabbitmq.virtual-host}")
16     public String virtualHost;
17 
18     @Bean
19     public ConnectionFactory getConnectionFactory(){
20         CachingConnectionFactory factory=new CachingConnectionFactory();
21         factory.setHost(host);
22         factory.setPort(port);
23         factory.setUsername(username);
24         factory.setPassword(password);
25         factory.setVirtualHost(virtualHost);
26         return factory;
27     }
28 }
29 
30 @Configuration
31 @RabbitListener(bindings=@QueueBinding(
32 exchange=@Exchange(value="directExchange"),
33 value=@Queue(value="direct.first"),
34 key="directKey1"))
35 public class RabbitMqListener {
36     
37     @RabbitHandler
38     public void handler(String message){
39         System.out.println(message);
40     }
41 }
42 
43 @SpringBootApplication
44 public class App {
45     
46     public static void main(String[] args){
47         SpringApplication.run(App.class, args);
48     }
49 }

執行結果:

回到目錄

 

五、Consumer 訊息接收管控 

在第四節主要介紹了 Producer 端的佇列傳送與監控,它只能管理 Producer 與 Broker Server 之間的通訊,但並不能確認 Consumer 是否能成功接收到佇列,在這節內容將介紹 Consumer 端的佇列接收與監聽。前面幾節裡,Consumer 端都是簡單地直接使用 RabbitListener 對佇列進行監聽,其實 RabbitMQ 已經為使用者準備了功能更強大的 MessageListenerContainer 容器用於管理 Message ,下面將為大家介紹。

5.1 AbstractMessageListenerContainer 介紹

AbstractMeessageListenerContainer 虛擬類是 RabbitMQ 封裝好的一個容器,本身並沒有對訊息進行處理,而是把訊息的處理方式交給了 MessageListener 。而它的主要功能是實現 MessageListener 的繫結,ApplicationContext 上下文的繫結,ErrorHandler 的錯誤處理方法的繫結、對訊息消費的開始、結束等等預設引數進行配置,讓開發人員可以在容器中對 Consumer 實現統一管理。SimpleMessageListenerContainer、DirectMessageLinstenerCoontainer 都是它的子類,分別應用於不同的場景,在下面會再作詳細介紹。

方法 說明
void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) 設定訊息接收確認的模式(下文會有詳細介紹)
AcknowledgeMode getAcknowledgeMode() 獲取訊息接收確認的模式(下文會有詳細介紹)
void setPrefetchCount(int prefetchCount) 設定每個 consumer 每次可接收到訊息的最大數量
void setQueues(Queue... queues) 設定監聽Queue佇列
void addQueues(Queue... queues) 加入監聽Queue佇列
void setMessageListener(Object messageListener) 繫結MessageListener,對資訊進行處理
void setChannelAwareMessageListener(ChannelAwareMessageListener messageListener) 繫結ChannelAwareMessageListener,對資訊進行處理,同時可獲取當前使用的channel資訊
Object getMessageListener() 獲取MessageListener物件
void setMessageConverter(MessageConverter messageConverter) 繫結MessageConverter訊息轉換物件 
void setApplicationContext(ApplicationContext applicationContext) 繫結ApplicationContext上下文
ConnectionFactory getConnectionFactory() 獲取ConnectionFactory連線工廠
void setListenerId(String listenerId) 設定ListenerId

MessageListener 是監聽訊息最常用 Listener,它只包含了一個方法 void onMessage(Message message),這是訊息接收最常用的一個方法,開發者只需要實現此方法即可對接收到的 Message 進行處理。
ChannelAwareMessageListener 相當於是 MessageListener的一個擴充套件,包含了方法 void onMessage(Message message, Channel channel),除了對 Message 進行處理外,還可以對接收此 Message 的 Channel 進行檢測。

5.2 SimpleMessageListenerContainer 常用方法

SimpleMessageListenerContainer 是最常用的 MessageListener 容器,它可以通過下面的方法設定預設消費者數量與最大的消費者數量。下面例子中嘗試把 consurrentConsumers 設定為3,把maxConcurrentConsumers 設定為4,並同時監控 direct 模式交換器的 direct.first,direct.second 佇列。

方法 說明
void setConcurrentConsumers(final int concurrentConsumers) 設定當前佇列中消費者數量
void setMaxConcurrentConsumers(int maxConcurrentConsumers) 設定當前佇列中最大消費者數量

通過截圖可以看到,系統預設會為每個 queue 都建立 3 個 consumers,不同的 queue 中的 consumers 是共享相同的 3 個 channel 。

當 Producer 端傳送訊息時,consumers 的實際數量可根據 maxConcurrentConsumers 的配置限制進行擴充套件。 

Producer 端程式碼

 1 @Configuration
 2 public class BindingConfig {
 3     public final static String first="direct.first";
 4     public final static String second="direct.second";
 5     public final static String Exchange_NAME="directExchange";
 6     public final static String RoutingKey1="directKey1";
 7     public final static String RoutingKey2="directKey2";
 8     
 9     @Bean
10     public Queue queueFirst(){
11         return new Queue(first);
12     }
13     
14     @Bean
15     public Queue queueSecond(){
16         return new Queue(second);
17     }
18     
19     @Bean
20     public DirectExchange directExchange(){
21         return new DirectExchange(Exchange_NAME);
22     }
23     
24     //利用BindingBuilder繫結Direct與queueFirst
25     @Bean
26     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
27         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
28     }
29     
30     //利用BindingBuilder繫結Direct與queueSecond
31     @Bean
32     public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
33         return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
34     }   
35 }
36 
37 @Configuration
38 public class ConnectionConfig {
39     @Value("${spring.rabbitmq.host}")
40     public String host;
41     
42     @Value("${spring.rabbitmq.port}")
43     public int port;
44     
45     @Value("${spring.rabbitmq.username}")
46     public String username;
47     
48     @Value("${spring.rabbitmq.password}")
49     public String password;
50     
51     @Value("${spring.rabbitmq.virtual-host}")
52     public String virtualHost;
53 
54     @Bean
55     public ConnectionFactory getConnectionFactory(){
56         CachingConnectionFactory factory=new CachingConnectionFactory();
57         factory.setHost(host);
58         factory.setPort(port);
59         factory.setUsername(username);
60         factory.setPassword(password);
61         factory.setVirtualHost(virtualHost);
62         return factory;
63     }
64 }
65 
66 @Controller
67 @RequestMapping("/producer")
68 public class ProducerController {
69     @Autowired
70     private RabbitTemplate template;
71  
72     @RequestMapping("/send")
73     public void send(HttpServletResponse response) throws InterruptedException, IOException{
74         for(Integer n=0;n<100;n++){
75                 CorrelationData correlationData=getCorrelationData();
76                 template.convertAndSend("directExchange","directKey1", 
77                          "queue1"+"  "+n.toString(),correlationData);
78                 template.convertAndSend("directExchange","directKey2"," queue2"+"  "+n.toString(),correlationData);            
79                 Thread.currentThread().sleep(30);
80         }
81     }
82 
83     private CorrelationData getCorrelationData(){
84         return new CorrelationData(UUID.randomUUID().toString());
85     }
86 }

Consumer 端程式碼:

  1 @Configuration
  2 public class ConnectionConfig {
  3     @Value("${spring.rabbitmq.host}")
  4     public String host;
  5     
  6     @Value("${spring.rabbitmq.port}")
  7     public int port;
  8     
  9     @Value("${spring.rabbitmq.username}")
 10     public String username;
 11     
 12     @Value("${spring.rabbitmq.password}")
 13     public String password;
 14     
 15     @Value("${spring.rabbitmq.virtual-host}")
 16     public String virtualHost;
 17 
 18     @Bean
 19     public ConnectionFactory getConnectionFactory(){
 20         CachingConnectionFactory factory=new CachingConnectionFactory();
 21         factory.setHost(host);
 22         factory.setPort(port);
 23         factory.setUsername(username);
 24         factory.setPassword(password);
 25         factory.setVirtualHost(virtualHost);
 26         return factory;
 27     }
 28 }
 29 
 30 @Configuration
 31 public class BindingConfig {
 32     public final static String first="direct.first";
 33     public final static String second="direct.second";
 34     public final static String Exchange_NAME="directExchange";
 35     public final static String RoutingKey1="directKey1";
 36     public final static String RoutingKey2="directKey2";
 37     
 38     @Bean
 39     public Queue queueFirst(){
 40         return new Queue(first);
 41     }
 42     
 43     @Bean
 44     public Queue queueSecond(){
 45         return new Queue(second);
 46     }
 47     
 48     @Bean
 49     public DirectExchange directExchange(){
 50         return new DirectExchange(Exchange_NAME);
 51     }
 52     
 53     //利用BindingBuilder繫結Direct與queueFirst
 54     @Bean
 55     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 56         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
 57     }
 58     
 59     //利用BindingBuilder繫結Direct與queueSecond
 60     @Bean
 61     public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
 62         return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
 63     }   
 64 }
 65 @Configuration
 66 public class SimpleMessListener {
 67     @Autowired
 68     private RabbitTemplate template;
 69     private int index=0;
 70 
 71     @Bean
 72     public SimpleMessageListenerContainer messageContainer(){
 73         SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
 74         container.setConnectionFactory(connectionConfig.getConnectionFactory());
 75         // 繫結Queue1/Queue2
 76         container.setQueueNames("direct.first");      
 77         container.addQueueNames("direct.second");
 78         //設定預設 consumer 數為3
 79         container.setConcurrentConsumers(3);
 80         //設定最大 consumer 數為4
 81         container.setMaxConcurrentConsumers(4);
 82         //標記 consumerTag
 83         container.setConsumerTagStrategy(queue ->  "consumer"+(++index));
 84         //繫結MessageListener顯示接收資訊
 85         container.setMessageListener(new MessageListener(){
 86             @Override
 87             public void onMessage(Message message) {
 88                 // TODO 自動生成的方法存根
 89                 Thread thread=Thread.currentThread();
 90                 MessageProperties messProp=message.getMessageProperties();
 91                 try {
 92                     System.out.println("  ConsumerTag:"+messProp.getConsumerTag()
 93                             +"  ThreadId is:"+thread.getId()+"  Queue:"+messProp.getConsumerQueue()
 94                             +"  "+new String(message.getBody(),"UTF-8"));
 95                 } catch (UnsupportedEncodingException e) {
 96                     // TODO 自動生成的 catch 塊
 97                     e.printStackTrace();
 98                 }
 99             }
100             
101         });
102         return container;
103     }
104 }

執行結果

5.3 SimpleMessageListenerContainer 的運作原理

在 SimpleMessageListenerContainer 模式中,無論系統監聽多少個 queue 佇列,channel 都是共享的,類似上面的例子,4個 channel 會把接收到不同的佇列請求並分發到對應的 consumer 進行處理。這樣做的好處是系統可以通過 concurrentConsumers、maxConcurrentConsumers 靈活設定當前佇列中消費者的數量,系統可以跟據實際需求靈活處理。但由於每個 channel 都是在固定執行緒中執行的,一個 channel 要遊走於多個 consumer 當中,這無疑增加了系統在上下文切換中的開銷。下面用系統提供的 ChannelAwareMessageListener 介面,以更直觀的例子說明一下 SimpleMessageListenerContainer 當中 channel、queue、consumer 之間的關係。

Producer 端程式碼

 1 @Configuration
 2 public class BindingConfig {
 3     public final static String first="direct.first";
 4     public final static String second="direct.second";
 5     public final static String Exchange_NAME="directExchange";
 6     public final static String RoutingKey1="directKey1";
 7     public final static String RoutingKey2="directKey2";
 8     
 9     @Bean
10     public Queue queueFirst(){
11         return new Queue(first);
12     }
13     
14     @Bean
15     public Queue queueSecond(){
16         return new Queue(second);
17     }
18     
19     @Bean
20     public DirectExchange directExchange(){
21         return new DirectExchange(Exchange_NAME);
22     }
23     
24     //利用BindingBuilder繫結Direct與queueFirst
25     @Bean
26     public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
27         return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
28     }
29     
30     //利用BindingBuilder繫結Direct與queueSecond
31     @Bean
32     public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){       
33         return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
34     }   
35 }
36 
37 @Configuration
38 public class ConnectionConfig {
39     @Value("${spring.rabbitmq.host}")
40     public String host;
41     
42     @Value("${spring.rabbitmq.port}")
43     public int port;
44     
45     @Value("${spring.rabbitmq.username}")
46     public String username;
47     
48     @Value("${spring.rabbitmq.password}")
49     public String password;
50     
51     @Value("${spring.rabbitmq.virtual-host}")
52     public String virtualHost;
53 
54     @Bean
55     public ConnectionFactory getConnectionFactory(){
56         CachingConnectionFactory factory=new CachingConnectionFactory();
57         factory.setHost(host);
58         factory.setPort(port);
59         factory.setUsername(username);
60         factory.setPassword(password);
61         factory.setVirtualHost(virtualHost);
62         return factory;
63     }
64 }
65 
66 @Controller
67 @RequestMapping("/producer")
68 public class ProducerController {
69     @Autow