RabbitMq-topic萬用字元模式(五)
阿新 • • 發佈:2021-10-19
一、topics萬用字元模式:
Topic 萬用字元型別與 Direct路由 相比,都是可以根據 RoutingKey 把訊息路由到不同的佇列。只不過 Topic 型別 Exchange 可以讓佇列在繫結 Routing key 的時候使用萬用字元!
1.1 topic模式的Routingkey
Routingkey 一般都是有一個或多個單片語成,多個單詞之間以”.”分割,例如: item.insert
萬用字元規則:
# : 匹配一個或多個詞
* : 匹配不多不少恰好1個詞
舉例:
item.# :能夠匹配 item.insert.abc 或者 item.insert item.* :只能匹配 item.insert
1.2 topic模式示例圖
圖解:
紅色Queue:繫結的是 usa.# ,因此凡是以 usa. 開頭的 routing key 都會被匹配到
黃色Queue:繫結的是 #.news ,因此凡是以 .news 結尾的 routing key 都會被匹配
二、topic萬用字元模式需求
編寫生產者、消費者程式碼並測試瞭解Topics萬用字元模式的特點
2.1 步驟:
- 生產者:傳送包含有item.insert、item.update,item.delete的3中路由key訊息
- 消費者1:監聽的佇列繫結到交換機的路由key為:item.update,item.delete
- 消費者2:監聽的佇列繫結到交換機的路由key為:item.*
2.2 程式碼示例
生產者程式碼Producer:
package com.study.rabbitmq.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.study.rabbitmq.util.ConnectionUtil; //topic路由模式 傳送訊息 public class Producer { //交換機名稱 static final String TOPIC_EXCHANGE = "topic_exchange"; //佇列名稱 static final String TOPIC_QUEUE_1 = "topic_queue_1"; static final String TOPIC_QUEUE_2 = "topic_queue_2"; public static void main(String[] args) throws Exception { // 1. 建立連線 Connection connection = ConnectionUtil.getConnection(); // 2. 建立頻道; Channel channel = connection.createChannel(); // 3.宣告交換機,引數1:交換機名稱,引數2:交換機型別(FANOUT廣播型別、direct 定向路由,topic 萬用字元) channel.exchangeDeclare(TOPIC_EXCHANGE , BuiltinExchangeType.TOPIC); //4. 傳送訊息; String message = "新增商品(萬用字元模式)---routingkey 為 insert"; /** * 引數1:交換機名稱;如果沒有則指定空字串(表示使用預設的交換機) * 引數2:路由key,簡單模式中可以使用佇列名稱 * 引數3:訊息其它屬性 * 引數4:訊息內容 */ channel.basicPublish(TOPIC_EXCHANGE,"item.insert",null,message.getBytes()); System.out.println("已傳送訊息:" + message); message = "更新商品(萬用字元模式)---routingkey 為 update"; channel.basicPublish(TOPIC_EXCHANGE,"item.update",null,message.getBytes()); System.out.println("已傳送訊息:" + message); message = "刪除商品(萬用字元模式)---routingkey 為 delete"; channel.basicPublish(TOPIC_EXCHANGE,"item.delete",null,message.getBytes()); System.out.println("已傳送訊息:" + message); //5. 關閉資源 channel.close(); connection.close(); } }
消費者1Consumer1-程式碼:
package com.study.rabbitmq.topic;
import com.rabbitmq.client.*;
import com.study.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
/**
* topic路由模式;消費者接收訊息
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
// 1. 建立連線;(抽取一個獲取連線的工具類)
Connection connection = ConnectionUtil.getConnection();
// 2. 建立頻道;
Channel channel = connection.createChannel();
//3 宣告交換機
channel.exchangeDeclare(com.study.rabbitmq.topic.Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);
// 4. 宣告佇列;
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列(訊息會持久化儲存在伺服器上)
* 引數3:是否獨佔本連線
* 引數4:是否在不使用的時候佇列自動刪除
* 引數5:其它引數
*/
channel.queueDeclare(com.study.rabbitmq.topic.Producer.TOPIC_QUEUE_1, true, false, false, null);
//5 佇列繫結到交換機上
channel.queueBind(com.study.rabbitmq.topic.Producer.TOPIC_QUEUE_1, com.study.rabbitmq.topic.Producer.TOPIC_EXCHANGE,"item.update");
channel.queueBind(com.study.rabbitmq.topic.Producer.TOPIC_QUEUE_1, com.study.rabbitmq.topic.Producer.TOPIC_EXCHANGE,"item.delete");
// 6. 建立消費者(接收訊息並處理訊息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//訊息id
System.out.println("訊息id為:" + envelope.getDeliveryTag());
//接收到的訊息
System.out.println("消費者1---接收到的訊息為:" + new String(body, "utf-8"));
}
};
// 6. 監聽佇列 (需要持續監聽佇列訊息,所以不要關閉資源)
/**
* 引數1:佇列名
* 引數2:是否要自動確認;設定為true表示訊息接收到自動向MQ回覆接收到了,MQ則會將訊息從佇列中刪除;
* 如果設定為false則需要手動確認
* 引數3:消費者
*/
channel.basicConsume(Producer.TOPIC_QUEUE_1,true,defaultConsumer);
//不關閉資源,應該一直監聽訊息
// channel.close();
// connection.close();
}
}
消費者2-Consumer2 程式碼:
package com.study.rabbitmq.topic;
import com.rabbitmq.client.*;
import com.study.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
/**
* topic路由模式;消費者接收訊息
*/
public class Consumer2 {
public static void main(String[] args) throws Exception {
// 1. 建立連線;(抽取一個獲取連線的工具類)
Connection connection = ConnectionUtil.getConnection();
// 2. 建立頻道;
Channel channel = connection.createChannel();
//3 宣告交換機
channel.exchangeDeclare(Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);
// 4. 宣告佇列;
/**
* 引數1:佇列名稱
* 引數2:是否定義持久化佇列(訊息會持久化儲存在伺服器上)
* 引數3:是否獨佔本連線
* 引數4:是否在不使用的時候佇列自動刪除
* 引數5:其它引數
*/
channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);
//5 佇列繫結到交換機上
channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHANGE,"item.*");
// 6. 建立消費者(接收訊息並處理訊息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機
System.out.println("交換機為:" + envelope.getExchange());
//訊息id
System.out.println("訊息id為:" + envelope.getDeliveryTag());
//接收到的訊息
System.out.println("消費者2---接收到的訊息為:" + new String(body, "utf-8"));
}
};
// 6. 監聽佇列 (需要持續監聽佇列訊息,所以不要關閉資源)
/**
* 引數1:佇列名
* 引數2:是否要自動確認;設定為true表示訊息接收到自動向MQ回覆接收到了,MQ則會將訊息從佇列中刪除;
* 如果設定為false則需要手動確認
* 引數3:消費者
*/
channel.basicConsume(Producer.TOPIC_QUEUE_2,true,defaultConsumer);
//不關閉資源,應該一直監聽訊息
// channel.close();
// connection.close();
}
}
先執行消費者1 、2 再執行生產者,
idea結果: 消費者1佇列 接收:路由key為:item.update,item.delete;消費者2佇列 接收:路由key為:item.update,item.delete,item.insert
rabbitmq後臺管理介面結果
三、topic萬用字元總結:
Topics萬用字元模式:可以根據路由key將訊息傳遞到對應路由key的佇列;
佇列繫結到交換機的路由key可以有多個;
萬用字元模式中路由key可以使用 *
和 #
;使用了萬用字元模式之後對於路由Key的配置更加靈活。