RabbitMQ入門:主題路由器(Topic Exchange)
上一篇博文中,我們使用direct exchange 代替了fanout exchange,這次我們來看下topic exchange。
一、Topic Exchange介紹
topic exchange和direct exchange類似,都是通過routing key和binding key進行匹配,不同的是topic exchange可以為routing key設置多重標準。
direct路由器類似於sql語句中的精確查詢;topic 路由器有點類似於sql語句中的模糊查詢。
還記得嗎?我們在《RabbitMQ入門:發布/訂閱(Publish/Subscribe)》中對exchange的分類進行過介紹:
Direct:完全根據key進行投遞的,例如,綁定時設置了routing key為”abc”,那麽客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。 Topic:對key進行模式匹配後進行投遞,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。 Fanout:不需要key,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。 Headers:我們可以不考慮它。
下面是官網給出的工作模型(P代表生產者,X代表exhange,紅色的Q代表隊列,C代表消費者):
我們來分析下這個模型。
它發送的消息是用來描述動物的。路由鍵有三個單詞:<speed>.<color>.<species>,第一個單詞描述了速度,第二個描述了顏色,第三個描述了物種。
有三個綁定鍵,Q1綁定鍵為*.orange.*(關註所有顏色為orange的動物); Q2的綁定鍵有兩個,分別是*.*.rabbit(關註所有的兔子)和lazy.#(關註所有速度為lazy的動物)。
因此,路由鍵為quick.orange.rabbit的消息將發送到Q1和Q2,路由鍵為quick.orange.fox的消息將發送到Q1,路由鍵為lazy.brown.fox的消息將發送到Q2。路由鍵為lazy.pink.rabbit的消息將發送到Q2,但是註意,它只會到達Q2一次,盡管它匹配了兩個綁定鍵。路由鍵為quick.brown.fox的消息因為不和任意的綁定鍵匹配,所以將會被丟棄。
如果有人手一抖發了個lazy.orange.male.rabbit這種四個單詞的,這個怎麽辦呢? 由於它和lazy.#匹配,因此將發送到Q2。
二、代碼示例
接下來我們看下代碼
- 生產者
public class LogTopicSender { // exchange名字 public static String EXCHANGE_NAME = "topicExchange"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.創建連接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.為通道聲明topic類型的exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 3.發送消息到指定的exchange,隊列指定為空,由exchange根據情況判斷需要發送到哪些隊列 String routingKey = "info"; // String routingKey = "log4j.error"; // String routingKey = "logback.error"; // String routingKey = "log4j.warn"; String msg = " hello rabbitmq, I am " + routingKey; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("product send a msg: " + msg); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 4.關閉連接 if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
- 消費者
public class LogTopicReciver { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.創建連接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.為通道聲明topic類型的exchange channel.exchangeDeclare(LogTopicSender.EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 3.創建隨機名字的隊列 String queueName = channel.queueDeclare().getQueue(); // 4.建立exchange和隊列的綁定關系 String[] bindingKeys = { "#" }; // String[] bindingKeys = { "log4j.*", "#.error" }; // String[] bindingKeys = { "*.error" }; // String[] bindingKeys = { "log4j.warn" }; for (int i = 0; i < bindingKeys.length; i++) { channel.queueBind(queueName, LogTopicSender.EXCHANGE_NAME, bindingKeys[i]); System.out.println(" **** LogTopicReciver keep alive ,waiting for " + bindingKeys[i]); } // 5.通過回調生成消費者並進行監聽 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { // 獲取消息內容然後處理 String msg = new String(body, "UTF-8"); System.out.println("*********** LogTopicReciver" + " get message :[" + msg + "]"); } }; // 6.消費消息 channel.basicConsume(queueName, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
- 啟動消費者,作為消費者1
- 分別將String[] bindingKeys = { "#" };改為String[] bindingKeys = { "log4j.*", "#.error" };/String[] bindingKeys = { "*.error" };/String[] bindingKeys = { "log4j.warn" };,然後啟動作為消費者2、消費者3、消費者4
- 啟動4次生產者,routing key分別為String routingKey = "info";、String routingKey = "log4j.error";、String routingKey = "logback.error";、String routingKey = "log4j.warn";
- 觀察控制臺log
生產者: product send a msg: hello rabbitmq, I am info product send a msg: hello rabbitmq, I am log4j.error product send a msg: hello rabbitmq, I am logback.error product send a msg: hello rabbitmq, I am log4j.warn 消費者1: **** LogTopicReciver keep alive ,waiting for # *********** LogTopicReciver get message :[ hello rabbitmq, I am info] *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error] *********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error] *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn] 消費者2: **** LogTopicReciver keep alive ,waiting for log4j.* **** LogTopicReciver keep alive ,waiting for #.error *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error]
*********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error] *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn] 消費者3: **** LogTopicReciver keep alive ,waiting for *.error *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error] *********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error] 消費者4: **** LogTopicReciver keep alive ,waiting for log4j.warn *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn] - 觀察RabbitMQ管理頁面
RabbitMQ入門:主題路由器(Topic Exchange)