RabbitMQ指南之五:主題交換器(Topic Exchange)
在上一章中,我們完善了我們的日誌系統,用direct交換器替換了fanout交換器,使得我們可以有選擇性地接收消息。盡管如此,仍然還有限制:不能基於多個標準進行路由。在我們的日誌系統中,我們可能不僅希望根據日誌等級訂閱日誌,還希望根據日誌來源訂閱日誌。這個概念來自於unix工具syslog,它不僅可以根據日誌等級(info/warn/crit...)來路由日誌,同時還可以根據設備(auth/cron/kern...)來路由日誌。這將更加靈活,我們可能希望只監聽來自‘cron‘的error級別日誌,同時又要接收來自‘kern‘的所有級別的日誌。我們的日誌系統如果要實現這個功能,就需要使用到另外一種交換器:主題交換器(Topic Exchange)。
1、主題交換器(Topic Exchange)
發送到主題交換器的消息不能有任意的routing key,必須是由點號分開的一串單詞,這些單詞可以是任意的,但通常是與消息相關的一些特征。比如以下是幾個有效的routing key: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",routing key的單詞可以有很多,最大限制是255 bytes。
binding key必須與routing key模式一樣。Topic交換器的邏輯與direct交換器有點相似:使用特定路由鍵發送的消息將被發送到所有使用匹配綁定鍵綁定的隊列,然而,綁定鍵有兩個特殊的情況,如下:
- * 表示匹配任意一個單詞
- # 表示匹配任意一個或多個單詞
下圖很好地表示這這兩個通配符的用法:
在這個例子中,我們將發送所有跟動物有關的消息,這些消息將會發送到由三個單詞,兩個點號組成的routing key,第一個單詞了表示的是速度,第二個單詞表示顏色,第三個單詞表示種類:
"<speed>.<colour>.<species>"。
我們創建三個綁定關系:隊列Q1綁定到綁定鍵*.orange.* ,隊列Q2綁定到*.*.rabbit和lazy.#。
總結下來就是:
- 隊列Q1對橘黃色(orange)顏色的所有動物感興趣;
- 隊列Q2對所有的兔子(rabbit)和所有慢吞吞(lazy)的動物感興趣。
一個路由為 "quick.orange.rabbit"的消息,將會被轉發到這兩個隊列,路由為"lazy.orange.elephant"的消息也被轉發給這兩個隊列,路由為 "quick.orange.fox"的消息將只被轉發到Q1隊列,路由為 "lazy.brown.fox"的消息將只被轉發到Q2隊列。"lazy.pink.rabbit" 只被轉發到Q2隊列一次(雖然它匹配綁定鍵*.*.rabbit和lazy.#),路由為 "quick.brown.fox"的消息與任何一個綁定鍵都不匹配,因此將會被丟棄。
如果我們發送的消息的的路由是由一個單詞“orangle"或4個單詞”quick.orangle.male.rabbit“將會怎樣?會因為與任何一個綁定鍵不匹配而被丟棄。
另一方面,路由為 "lazy.orange.male.rabbit"的消息,因為匹配"lazy.#"綁定鍵,因而會被轉發到Q2隊列。
Topic交換器非常強大,可以像其他類型的交換器一樣工作:
當一個隊列的綁定鍵是"#"是,它將會接收所有的消息,而不再考慮所接收消息的路由鍵,就像是fanout交換器一樣;
當一個隊列的綁定鍵沒有用到”#“和”*“時,它又像direct交換一樣工作。
2、完整的代碼
下面是在我們日誌系統中采用Topic交換器的完整代碼,我們要發送的日誌消息的路由由兩個單詞組成:"<facility>.<severity>"。
EmitLogTopic.java
1 import com.rabbitmq.client.BuiltinExchangeType; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 6 public class EmitLogTopic { 7 8 private final static String EXCHANGE_NAME = "topic_logs"; 9 10 public static void main(String[] args) throws Exception { 11 12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setHost("localhost); 14 15 try(Connection connection = factory.newConnection(); 16 Channel channel = connection.createChannel()) { 17 18 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); 19 20 String message = "A critical kernel error"; 21 String routingKey = "kern.critical"; 22 23 channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("utf-8")); 24 25 System.out.println(" [x] Sent ‘" + routingKey + "‘:‘" + message + "‘"); 26 } 27 } 28 }
ReceiveLogsTopic.java
1 import com.rabbitmq.client.*; 2 3 public class ReceiveLogsTopic { 4 5 private final static String EXCHANGE_NAME = "topic_logs"; 6 7 public static void main(String[] args) throws Exception { 8 9 ConnectionFactory factory = new ConnectionFactory(); 10 factory.setHost("localhost"); 11 12 Connection connection = factory.newConnection(); 13 Channel channel = connection.createChannel(); 14 15 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); 16 17 String queueName = channel.queueDeclare().getQueue(); 18 19 if (args.length < 1) { 20 System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); 21 System.exit(1); 22 } 23 24 for (String bindingKey : args) { 25 channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); 26 } 27 28 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 29 30 DeliverCallback deliverCallback = (consumerTag, delivery) -> { 31 String message = new String(delivery.getBody(), "UTF-8"); 32 System.out.println(" [x] Received ‘" + 33 delivery.getEnvelope().getRoutingKey() + "‘:‘" + message + "‘"); 34 }; 35 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); 36 } 37 }
啟動4個接收者,分別傳入綁定鍵:#、kern.*、*.critical、kern.* *.critical。
啟動生產者:發送一條路由為“kern.critical”的消息,消息內容為:“A critical kernel error”,分別查看接收情況:
可以看到,所有綁定鍵的隊列都正常接收到了消息。
3、SpringBoot實現
工程如下圖:
一、生產者
application.properties
#RabbitMq spring.rabbitmq.host=localhost rabbitmq.exchange.topic=topic_logs2 rabbitmq.exchange.topic.routing.key=kern.critical
EmitLogTopic.java
1 import org.springframework.amqp.core.AmqpTemplate; 2 import org.springframework.beans.factory.annotation.Autowired; 3 import org.springframework.beans.factory.annotation.Value; 4 import org.springframework.stereotype.Component; 5 6 @Component 7 public class EmitLogTopic { 8 9 @Value("${rabbitmq.exchange.topic}") 10 private String exchangeName; 11 12 @Value("${rabbitmq.exchange.topic.routing.key}") 13 private String routingKey; 14 15 @Autowired 16 private AmqpTemplate template; 17 18 public void sendMessage(Object message) { 19 System.out.println("發送消息:" + message); 20 template.convertAndSend(exchangeName,routingKey,message); 21 } 22 }
EmitLogTopicRunner.java
1 import org.springframework.beans.factory.annotation.Autowired; 2 import org.springframework.boot.ApplicationArguments; 3 import org.springframework.boot.ApplicationRunner; 4 import org.springframework.stereotype.Component; 5 6 @Component 7 public class EmitLogTopicRunner implements ApplicationRunner { 8 9 @Autowired 10 private EmitLogTopic emitLogTopic; 11 12 @Override 13 public void run(ApplicationArguments args) throws Exception { 14 emitLogTopic.sendMessage("A critical kernel error"); 15 } 16 }
二、消費者
application.properties
#RabbitMq spring.rabbitmq.host=localhost rabbitmq.exchange.topic=topic_logs2 rabbitmq.topic.queue=topic_queue rabbitmq.exchange.topic.binding.key=kern.critical server.port=8081
ReceiveLogsTopic.java
1 import org.springframework.amqp.core.ExchangeTypes; 2 import org.springframework.amqp.rabbit.annotation.*; 3 import org.springframework.stereotype.Component; 4 5 @Component 6 @RabbitListener( 7 bindings = @QueueBinding( 8 value = @Queue(value = "${rabbitmq.topic.queue}",autoDelete = "true"), 9 exchange = @Exchange(value = "${rabbitmq.exchange.topic}",type = ExchangeTypes.TOPIC), 10 key = {"#","kern.*","*.critical"} 11 ) 12 ) 13 public class ReceiveLogsTopic { 14 15 @RabbitHandler 16 public void reeive(Object message) { 17 System.out.println("接收到消息:" + message); 18 } 19 }
啟動查看控制臺輸出:
生者產輸出:
消費者輸出:
至此,RabbitMq的主題交換器講解完了,在下一章中將會講解RabbitMq的RPC。
RabbitMQ指南之五:主題交換器(Topic Exchange)