RabbitMQ(十)——Routing 之訂閱模型-Topic
Topic簡介
Topic
型別的Exchange
與Direct
相比,都是可以根據RoutingKey
把訊息路由到不同的佇列。只不過Topic
型別Exchange
可以讓佇列在繫結Routing key
的時候使用萬用字元!這種模型Routingkey
一般都是由一個或多個單片語成,多個單詞之間以”.”分割,例如: item.insert
Topic 的要求
傳送到型別是 topic 交換機的訊息的 routing_key 不能隨意寫,必須滿足一定的要求,它必須是一個單詞列表,以點號分隔開。這些單詞可以是任意單詞,比如說:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".這種型別的。當然這個單詞列表最多不能超過 255 個位元組。
在這個規則列表中,其中有兩個替換符需要注意:
*(星號)可以代替一個單詞
#(井號)可以替代零個或多個單詞
Topic 匹配案例
下圖繫結關係如下:
Q1-->繫結的是
- 中間帶 orange 帶 3 個單詞的字串(.orange.)
Q2-->繫結的是
- 最後一個單詞是 rabbit 的 3 個單詞(..rabbit)
- 第一個單詞是 lazy 的多個單詞(lazy.#)
上圖是一個佇列繫結關係圖,我們來看看他們之間資料接收情況是怎麼樣的
quick.orange.rabbit------->被佇列 Q1Q2 接收到
lazy.orange.elephant ------->被佇列 Q1Q2 接收到
quick.orange.fox ------->被佇列 Q1 接收到
lazy.brown.fox ------->被佇列 Q2 接收到
lazy.pink.rabbit-------> 雖然滿足兩個繫結但只被佇列 Q2 接收一次
quick.brown.fox ------->不匹配任何繫結不會被任何佇列接收到會被丟棄
quick.orange.male.rabbit-------> 是四個單詞不匹配任何繫結會被丟棄
lazy.orange.male.rabbit-------> 是四個單詞但匹配 Q2
實戰
消費者一
package com.study.rabbitmq.seven; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.study.rabbitmq.utils.RabbitMQUtils; //消費者 public class ReceiveLogsTopic01 { //交換機名稱 public static final String EXCHANGE_NAME = "topic_logs"; //接收訊息 public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); //宣告交換機 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //宣告佇列 String queueName = "Q1"; channel.queueDeclare(queueName,false,false,false,null); //繫結通道 channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*"); System.out.println("等待接收訊息....."); DeliverCallback deliverCallback = (consumerTag, message) ->{ System.out.println(new String(message.getBody(),"UTF-8")); System.out.println("接收佇列:"+queueName + "繫結key:" + message.getEnvelope().getRoutingKey()); }; channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {}); } }
消費者二
package com.study.rabbitmq.seven;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.study.rabbitmq.utils.RabbitMQUtils;
//消費者
public class ReceiveLogsTopic02 {
//交換機名稱
public static final String EXCHANGE_NAME = "topic_logs";
//接收訊息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
//宣告交換機
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//宣告佇列
String queueName = "Q2";
channel.queueDeclare(queueName,false,false,false,null);
//繫結通道
channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
System.out.println("等待接收訊息.....");
DeliverCallback deliverCallback = (consumerTag, message) ->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接收佇列:"+queueName + "繫結key:" + message.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});
}
}
生產者
package com.study.rabbitmq.seven;
import com.rabbitmq.client.Channel;
import com.study.rabbitmq.utils.RabbitMQUtils;
import java.util.HashMap;
import java.util.Map;
//生產者
public class EmitLogTopic {
//交換機名稱
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
/*
* Q1-->繫結的是
* 中間帶orange的3個單詞的通道(*.orange.*)
* Q2-->繫結的是
* 最後一個單詞是rabbit的3個單詞的通道(*.*.rabbit)
* 第一個單詞是lazy的多個單詞的通道(lazy.#)
* */
HashMap<String, String> bindingKeymap = new HashMap<>();
bindingKeymap.put("quick.orange.rabbit","被佇列Q1Q2接收到");
bindingKeymap.put("lazy.orange.elephant","被佇列Q1Q2接收到");
bindingKeymap.put("quick.orange.fox","被佇列Q1接收到");
bindingKeymap.put("lazy.brown.fox","被佇列Q2接收到");
bindingKeymap.put("lazy.pink.rabbit","雖然滿足兩個繫結,但只被佇列Q2接收一次");
bindingKeymap.put("quick.brown.fox","不匹配任何繫結不會被任何佇列接收到,會被丟棄");
bindingKeymap.put("quick.orange.male.rabbit","是四個單詞不匹配任何繫結,會被丟棄");
bindingKeymap.put("lazy.orange.male.rabbit","被Q2接收到");
for (Map.Entry<String, String> bindingKeyEntry : bindingKeymap.entrySet()) {
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
System.out.println("生產者發出訊息:" + message);
}
}
}
執行測試: