RabbitMQ系列--主題(Topic)
阿新 • • 發佈:2020-12-20
技術標籤:中介軟體# RabbitMQ
上一篇部落格中,我們使用了direct型別的交換機,使得消費者有能力進行選擇性的訊息。但是仍然存在一些侷限性:它不能夠基於多重條件進行路由選擇。 我們可以使用Topic型別的交換機解決這個問題。
Topic型別的Exchange與Direct相比,都是可以根據RoutingKey把訊息路由到不同的佇列。只不過Topic型別Exchange可以讓佇列在繫結Routing key的時候可以使用萬用字元!這種模型的Routingkey一般都是由一共或多個單片語成,多個單詞之間以"."分割,例如:item.insert;
萬用字元
- #匹配0個或多個詞
- *匹配不多不少恰好1個詞
比如上圖中的 * .orange. * 與Q1佇列進行了繫結,* .* . rabbit和lazy.#與Q2佇列進行了繫結, 那麼生產者傳送路由key為user.orange.key會被消費者C1接收到。 傳送路由key為user.orange.rabbit的訊息,會被消費者C1和消費者C2都接收到。 如果傳送lazy或lazy.aa或lazy.aa.aa 都會被消費者C2接收到。
接下來讓我們用程式碼來驗證下;
1.定義生產者
public class Provider {
public static void main(String[] args) throws IOException {
//獲取連線物件
Connection connection = RabbitMQUtils.getConnection();
//獲取通道
Channel channel = connection.createChannel();
//將通道宣告指定交換機
//引數1:交換機名稱 引數2:交換機的型別 direct 路由型別
//沒有交換機會建立一共名為logs的交換機
channel.exchangeDeclare("topics" ,"topic");
//路由Key
String routerKey = "user.orange.key";
//傳送訊息
channel.basicPublish("topics",routerKey,null,("這是基於topics的 [ "+ routerKey+"]的訊息").getBytes());
//關閉連線和通道
RabbitMQUtils.closeChannelAndConnection(channel,connection);
}
}
2.定義消費者1
public class Customer1 {
public static void main(String[] args) throws IOException {
//獲取連線物件
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道繫結交換機
channel.exchangeDeclare("topics","topic");
//建立一個臨時的、唯一的佇列
//返回的是 臨時佇列名
String queueName = channel.queueDeclare().getQueue();
//繫結交換機和佇列
//引數1: 佇列名稱 引數2:交換機名稱 引數3:路由名稱
channel.queueBind(queueName,"topics","*.orange.*");
//消費訊息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1:"+new String(body));
}
});
}
}
3.定義消費者2
public class Customer2 {
public static void main(String[] args) throws IOException {
//獲取連線物件
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
//通道繫結交換機
channel.exchangeDeclare("topics","topic");
//建立一個臨時的、唯一的佇列
//返回的是 臨時佇列名
String queueName = channel.queueDeclare().getQueue();
//繫結交換機和佇列
//引數1: 佇列名稱 引數2:交換機名稱 引數3:路由名稱
channel.queueBind(queueName,"topics","*.*.rabbit");
channel.queueBind(queueName,"topics","lazy.#");
//消費訊息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2:"+new String(body));
}
});
}
}
先執行消費者1和消費者2,在執行生產者。 可以發現只有消費者1接收到了訊息
修改生產者的路由Key
String routerKey = "user.orange.rabbit";
再次執行生產者,可以發現消費者1和消費者2都收到了訊息
修改生產者路由key
String routerKey = "lazy";
只有消費者2收到了訊息。
可以看到,我們通過使用topic型別的交換機,成功實現了多重條件進行路由選擇。