官網英文版學習——RabbitMQ學習筆記(七)Topic
在上一篇中使用直接交換器改進了我們的系統,使得它能夠有選擇的進行接收消息,但它仍然有局限性——它不能基於多個條件進行路由。本節我們就進行能夠基於多個條件進行路由的topics exchange學習。
發送給主題交換器的消息不能是任意的routing_key—它必須是一個單詞列表,由點分隔。這些詞可以是任意的,但通常它們指定與消息相關的一些特性。幾個有效的路由示例:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".。路由鍵中可以有任意多的字,最多可以有255個字節。
路由鍵也需要是相同的形式,topic交換器背後的邏輯類似於direct交換器——發送帶有特定路由鍵的消息將被傳送到綁定匹配路由鍵的所有隊列中,
然而,有兩個重要的特殊情況需要綁定鍵:
- * (star) can substitute for exactly one word. 星號可以代替一個詞
- # (hash) can substitute for zero or more words. 哈希可以代替零個或多個詞
- 通過下面的例子進行解釋:
- 在這個例子中,我們將發送所有描述動物的信息。消息將通過一個包含三個單詞(兩個點)的路由鍵發送。路徑鍵中的第一個詞將描述速度,第二個是顏色,第三個是物種:“<速度>.<顏色>.<物種>”。
-
We created three bindings: Q1 is bound with binding key "*.orange.*" and Q2 with "*.*.rabbit" and "lazy.#".
These bindings can be summarised as:
-
Q1對所有的橙色動物都感興趣。
-
Q2希望聽到關於兔子的一切,以及關於懶惰動物的一切。
-
設置了路由鍵為 "quick.orange.rabbit"的消息將被投遞到兩個隊列,消息 "lazy.orange.elephant" 也被投遞到他們兩個,而"quick.orange.fox
-
通過上面的學習,我們知道,topic主題的交換器投遞消息與redict交換器的不同在於,交換器類型和路由鍵的模糊匹配,現在我們就去把之前的代碼進行改變,只需要將代碼中的交換器類型改為topic,並將綁定的路由鍵更改一下,投遞消息用的是確定的路由鍵,接收消息通過設置匹配的模糊綁定鍵,可以訂閱到多條件的消息,接下來上代碼,並將改動的代碼以下劃線形式標記出來。
-
發送方代碼:
-
package com.rabbitmq.HelloWorld; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Publish { private static final String EXCHANGE_NAME = "exchangeC"; public static void main(String[] args) throws IOException, TimeoutException { // TODO Auto-generated method stub // 創建工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.185"); factory.setUsername("admin"); factory.setPassword("123456"); factory.setPort(5672); // 創建連接 Connection connetion = factory.newConnection(); // 獲得信道 Channel channel = connetion.createChannel(); // 聲明交換器(聲明了一個名字位exchangeA,類型修改fanout為direct類型的交換器) channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String message = "555,2,2,33,66"; // 發送消息,將第二項參數routingkey channel.basicPublish(EXCHANGE_NAME, "my.hello.haha", null, message.getBytes()); System.out.println(" [x] Sent ‘" + message + "‘"); channel.close(); connetion.close(); } }
-
接收方一:
-
package com.rabbitmq.HelloWorld; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class Subscribe { private static final String EXCHANGE_NAME = "exchangeC"; private static final String QUEUE_NAME = "queueA"; public static void main(String[] args) throws IOException, TimeoutException { // TODO Auto-generated method stub // 創建工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.185"); factory.setUsername("admin"); factory.setPassword("123456"); factory.setPort(5672); // 創建連接 Connection connetion = factory.newConnection(); // 獲得信道 Channel channel = connetion.createChannel(); // 聲明交換器(聲明了一個名字位exchangeA,類型修改fanout為direct的交換器) channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 聲明一個隊列,在此采用臨時隊列 String queueName = channel.queueDeclare().getQueue(); // channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 隊列和交換器進行綁定,並設定路由鍵為error channel.queueBind(queueName, EXCHANGE_NAME, "my.#"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // TODO Auto-generated method stub String message = new String(body,"utf-8"); System.out.println("[x] received‘"+message+"‘"); } }; channel.basicConsume(queueName, consumer); } }
-
接收方二:
-
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class Subscribe { private static final String EXCHANGE_NAME = "exchangeC"; private static final String QUEUE_NAME = "queueA"; public static void main(String[] args) throws IOException, TimeoutException { // TODO Auto-generated method stub // 創建工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.185"); factory.setUsername("admin"); factory.setPassword("123456"); factory.setPort(5672); // 創建連接 Connection connetion = factory.newConnection(); // 獲得信道 Channel channel = connetion.createChannel(); // 聲明交換器(聲明了一個名字位exchangeA,修改fanout類型為direct類型的交換器?? channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 聲明??個隊列,在此采用臨時隊列 String queueName = channel.queueDeclare().getQueue(); // channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 隊列和交換器進行綁定,未設定路由鍵 channel.queueBind(queueName, EXCHANGE_NAME, "my.hello.*"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // TODO Auto-generated method stub String message = new String(body,"utf-8"); System.out.println("[x] received‘"+message+"‘"); } }; channel.basicConsume(queueName, consumer); } }
運行後,兩個接收方均接收道理發送方發送的消息,盡管我們在兩個接收方配置的綁定鍵並不相同,但是其模糊匹配規則均可以匹配到發送方發送消息的路由鍵,如果有大量接收方,我們就可以通過設置不同的綁定鍵來有選擇的接收較多的消息或者是不接受消息。
官網英文版學習——RabbitMQ學習筆記(七)Topic