1. 程式人生 > >再看rabbitmq的交換器和佇列的關係

再看rabbitmq的交換器和佇列的關係

最近又要用到rabbitmq,業務上要求伺服器只發一次訊息,需要多個客戶端都去單獨消費。但我們知道rabbitmq的機制裡,每個佇列裡的訊息只能消費一次,所以客戶端要單獨消費資訊,就必須得每個客戶端單獨監聽一個queue。所以我最終想實現的是服務端只宣告exchange,客戶端來建立queue和繫結exchange。但是在看各種rabbitmq博文和討論的時候,我覺得對exchange的模式和queue間的關係講的都不是很清楚。所以我決定自己驗證一下 ### fanout模式和direct模式 本文主要驗證fanout模式和direct模式下以上猜想是否可行。fanout模式就是大名鼎鼎的廣播模式了,只要queue綁定了fanout的交換器,就可以直接的收到訊息,無需routingkey的參與。而direct模式就是通過routing key直接傳送到綁定了同樣routing key的佇列中。那麼,在這兩種exchange的模式下,是否都可以實現服務端僅建立exchange,客戶端建立queue並繫結exchange呢? ### Direct模式驗證 我們先把交換器、routingkey、佇列的名稱定義好: 1. 交換器為directTest 2. routingkey為direct_routing_key 3. 佇列測試3個,首先測試Direct_test_queue_1,再行測試Direct_test_queue_2,再行測試Direct_test_queue_3 程式碼使用spring boot框架快速搭建。我們先規劃好需要幾個類來完成這個事情: 1. 針對生產者,需要RabbitmqConfig,用來配置exchange的 2. 針對生產者,需要DirectRabbitSender,用來實現Direct模式的訊息傳送 3. 針對消費者,需要DirectConsumerOne,來測試第一個佇列Direct_test_queue_1生成和訊息接收 4. 針對消費者,需要DirectConsumerTwo,來測試第二個佇列Direct_test_queue_2生成和訊息接收 5. 針對消費者,需要DirectConsumerThree,來測試第三個佇列Direct_test_queue_3生成和訊息接收 6. 我們還需要一個測試類RabbitmqApplicationTests,用於測試訊息的傳送和接收 rabbitmq先配置一個DirectExchange ``` @Bean DirectExchange directExchange(){ return new DirectExchange("directTest", true, false); } ``` 我們可以看到Direct交換器的名稱定義為了directTest,這時候還未繫結任何的佇列。啟動程式,若我們的設想沒錯,則rabbitmq中應該已經生成了directTest的exchange。 ![](https://img2020.cnblogs.com/blog/646489/202006/646489-20200621174323528-556155747.png) Bingo!directTest交換器成功建立。接下來,我們去編寫DirectRabbitSender的程式碼 ``` @Component public class DirectRabbitSender{ @Autowired private RabbitTemplate rabbitTemplate; private final String EXCHANGE_NAME = "directTest"; private final String ROUTING_KEY = "direct_routing_key"; public void send(Object message) { rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message); } } ``` 我們可以看到程式碼中,通過rabbitTemplate傳送訊息到了交換器為directTest,routingkey為direct_routing_key的地方。但這時候我們沒有任何隊列了,自然接不到訊息。現在我們去編寫第一個消費者DirectConsumerOne來接受訊息。 ``` @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "Direct_test_queue_1", durable = "true"), exchange = @Exchange(value = "directTest"), key = "direct_routing_key" )) public class DirectConsumerOne { @RabbitHandler private void onMessage(String message){ System.out.println("監聽佇列Direct_test_queue_1接到訊息" + message); } } ``` 通過程式碼可以看到,我們通過@QueueBinding把Direct_test_queue_1佇列繫結到了directTest和direct_routing_key上。Direct_test_queue_1並沒有在rabbitmq建立,這並沒有關係。一般來說,@RabbitListener會自動去建立佇列。啟動程式,我們去看一下rabbitmq裡佇列是不是建立了。 ![](https://img2020.cnblogs.com/blog/646489/202006/646489-20200621174809717-1378930897.png) Bingo!再次驗證成功。我們去看看繫結關係是不是正確。這時候Direct_test_queue_1應該繫結到了名為directTest的交換器,而繫結的routingkey為direct_routing_key ![](https://img2020.cnblogs.com/blog/646489/202006/646489-20200621174927693-1665446325.png) biubiubiu!繫結關係完全正確。到了這裡,我們進行最後一步,寫了單元測試去傳送訊息,檢視控制檯中消費者是否成功收到訊息。RabbitmqApplicationTests的程式碼如下: ``` @SpringBootTest class RabbitmqApplicationTests { @Autowired private DirectRabbitSender directRabbitSender; @Test void contextLoads() { } @Test public void directSendTest(){ directRabbitSender.send("direct-sender"); directRabbitSender.send("direct-sender_test"); } } ``` 啟動測試類,然後去檢視控制檯。 ![](https://img2020.cnblogs.com/blog/646489/202006/646489-20200621175149210-524553250.png) 沒錯,這就是我們想要達到的效果!基本可以宣佈Direct模式驗證成功。服務端生成exchange,客戶端去生成佇列繫結的方式在direct模式下完全可行。為了保險起見,再驗證一下生成多個消費者繫結到同一個佇列是否可行。 DirectConsumerTwo程式碼如下: ``` @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "Direct_test_queue_2", durable = "true"), exchange = @Exchange(value = "directTest"), key = "direct_routing_key" )) public class DirectConsumerTwo { @RabbitHandler private void onMessage(String message){ System.out.println("監聽佇列Direct_test_queue_2接到訊息" + message); } } ``` DirectConsumerThree程式碼如下: ``` @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "Direct_test_queue_3", durable = "true"), exchange = @Exchange(value = "directTest"), key = "direct_routing_key" )) public class DirectConsumerThree { @RabbitHandler private void onMessage(String message){ System.out.println("監聽佇列Direct_test_queue_3接到訊息" + message); } } ``` 啟動測試類,我們去看兩個地方: 1. rabbitmq是否建立了客戶端繫結的三個佇列Direct_test_queue_1、Direct_test_queue_2、Direct_test_queue_3 2. 消費者應該各自收到2條訊息(Test中傳送了兩條,參看上面 RabbitmqApplicationTests 的程式碼)。那3個佇列,控制檯中應該列印了6條訊息。 ![](https://img2020.cnblogs.com/blog/646489/202006/646489-20200621175700097-134202548.png) hohohoho!建立成功,並且繫結關係我看了也全都正確。我們去看控制檯 ![](https://img2020.cnblogs.com/blog/646489/202006/646489-20200621175739252-1502585841.png) 6條!沒有任何毛病,至此,可以宣佈Direct模式下,完全支援我們最初的想法:服務端生成exchange,客戶端去生成佇列繫結的方式在direct模式下完全可行。 ### fanout模式驗證 接下來我們驗證一下fanout的方式,基本操作流程和Direct模式一致。程式碼的結構也差不多: 1. 針對生產者,需要RabbitmqConfig,直接在Direct模式下的rabbitmqConfig裡直接新增Fanout的交換器配置 2. 針對生產者,需要FanoutRabbitSender,用來實現Fanout模式的訊息傳送 3. 針對消費者,需要FanoutConsumerOne,來測試第一個佇列Fanout_test_queue_1生成和訊息接收 4. 針對消費者,需要FanoutConsumerTwo,來測試第二個佇列Fanout_test_queue_2生成和訊息接收 5. 針對消費者,需要FanoutConsumerThree,來測試第三個佇列Fanout_test_queue_3生成和訊息接收 6. 測試類RabbitmqApplicationTests也直接複用Direact模式下測試的類 我就不多BB,直接上程式碼了。 RabbitmqConfig程式碼如下 ``` @Configuration public class RabbitmqConfig { @Bean DirectExchange directExchange(){ return new DirectExchange("directTest", true, false); } @Bean FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutTest", true, false); } } ``` FanoutRabbitSender的程式碼如下,此處和direct模式的區別是Fanout中沒有routingkey,所以程式碼裡也沒定義routingkey: ``` @Component public class FanoutRabbitSender{ @Autowired private RabbitTemplate rabbitTemplate; private final String EXCHANGE_NAME = "fanoutTest"; public void send(Object message) { rabbitTemplate.convertAndSend(EXCHANGE_NAME, null, message); } } ``` 我們到這裡先啟動程式試試,看看fanoutTest的交換器在沒有繫結佇列的情況下是否生成了。 ![](https://img2020.cnblogs.com/blog/646489/202006/646489-20200621180312431-819119539.png) 棒棒棒!和我們想的一樣,那接下來去寫完所有的消費者,這裡和Direct模式最重要的區別是@Exchange中必須要指定type為fanout。direct模式的程式碼裡沒指定是因為@Exchange的type預設值就是direct。我直接上程式碼了: ``` /** * 監聽器主動去宣告queue=fanout_test_queue_1,並繫結到fanoutTest交換器 */ @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "fanout_test_queue_1", durable = "true"), exchange = @Exchange(value = "fanoutTest", type = ExchangeTypes.FANOUT) )) public class FanoutConsumerOne { @RabbitHandler private void onMessage(String message){ System.out.println("監聽佇列fanout_test_queue_1接到訊息" + message); } } @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "fanout_test_queue_2", durable = "true"), exchange = @Exchange(value = "fanoutTest", type = ExchangeTypes.FANOUT) )) public class FanoutConsumerTwo { @RabbitHandler private void onMessage(String message){ System.out.println("監聽佇列fanout_test_queue_2接到訊息" + message); } } @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "fanout_test_queue_3", durable = "true"), exchange = @Exchange(value = "fanoutTest", type = ExchangeTypes.FANOUT) )) public class FanoutConsumerThree { @RabbitHandler private void onMessage(String message){ System.out.println("監聽佇列fanout_test_queue_3接到訊息" + message); } } ``` 接著去測試類RabbitmqApplicationTests中加上fanout的傳送測試,然後註釋掉direct的單元測試,以便一會造成干擾 ``` @SpringBootTest class RabbitmqApplicationTests { @Autowired private DirectRabbitSender directRabbitSender; @Autowired private FanoutRabbitSender fanoutRabbitSender; @Test void contextLoads() { } // @Test // public void directSendTest(){ // directRabbitSender.send("direct-sender"); // directRabbitSender.send("direct-sender_test"); // } @Test public void fanoutSendTest(){ fanoutRabbitSender.send("fanout-sender_1"); fanoutRabbitSender.send("fanout-sender_2"); } } ``` 程式碼都完成了,現在我們啟動測試類,看看控制檯是否正常收到了訊息 ![](https://img2020.cnblogs.com/blog/646489/202006/646489-20200621180818384-418666835.png) 看圖看圖,fanout模式下也完全認證成功!!!那我們可以宣佈,文章開頭的猜想完全可以實現。 ### 總結 服務端只宣告exchange,客戶端來建立queue和繫結exchange的方式完全可行。並且在Direct和Fanout模式下都可行。 那我們可以推測在Header模式的交換器和Topic模式的交換器下應該也大差不差。具體各位可自行驗證,基本流程和上面direct和fanout的流程差