rabbitmq--topic主題路由模型
上一篇,我們介紹了直接路由模型,這一篇我們介紹下主題路由模型。
就如同上一篇所說,直接路由模型和主題路由模型非常相似,它們唯一區別在於:直接路由模型對於路由key是精確匹配的,而主題路由模型對於路由key是模糊匹配的(使用萬用字元 )。
因此當我們瞭解上一篇中的直接路由模型,也不難理解本篇中的主題路由模型了。如圖:
那麼topic中的路由key的匹配規則是什麼呢?
topic模式中路由key中存在兩種萬用字元 * 和 #
- * (star) can substitute for exactly one word. *匹配明確的一個單詞
- # (hash) can substitute for zero or more words. #匹配0到多個單詞
topic模式中若路由key為"#" ,則表示對所有佇列感興趣 ,相當於fanout
topic模式中若路由key中不含有*和#字元時,則表示只對精確匹配的路由key佇列感興趣,則相對於routing
示例程式碼:
生產者
package com.tingcream.rabbitmq.topic; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class TopicLogProducer { private static String EXCHANGE_NAME="topic_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); //主機 埠 vhost 使用者名稱 密碼 factory.setHost("192.168.9.102"); factory.setUsername("rabbitmq"); factory.setPassword("rabbitmq123"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setVirtualHost("/"); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); // lazy.black.elephant 懶惰的黑色的象 // lazy.white.sheep 懶惰的白色的羊 // quick.black.horse 快速的黑色的馬 // quick.red.horse 快速的紅色的馬 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); for (int i=0;i<10;i++){ String message="你好,這是lazy.black.elephant級別訊息 "+i; channel.basicPublish(EXCHANGE_NAME,"lazy.black.elephant",null,message.getBytes()); System.out.println("TopicLogProducer Send: " + message ); } for (int i=0;i<10;i++){ String message="你好,這是lazy.white.sheep級別訊息 "+i; channel.basicPublish(EXCHANGE_NAME,"lazy.white.sheep",null,message.getBytes()); System.out.println("TopicLogProducer Send: " + message ); } for (int i=0;i<10;i++){ String message="你好,這是quick.black.horse級別訊息 "+i; channel.basicPublish(EXCHANGE_NAME,"quick.black.horse",null,message.getBytes()); System.out.println("TopicLogProducer Send: " + message ); } for (int i=0;i<10;i++){ String message="你好,這是quick.red.horse級別訊息 "+i; channel.basicPublish(EXCHANGE_NAME,"quick.red.horse",null,message.getBytes()); System.out.println("TopicLogProducer Send: " + message ); } channel.close(); connection.close(); } }
消費者A
package com.tingcream.rabbitmq.topic; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class TopicLogReceiverA { private static String EXCHANGE_NAME="topic_logs"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); //主機 埠 vhost 使用者名稱 密碼 factory.setHost("192.168.9.102"); factory.setUsername("rabbitmq"); factory.setPassword("rabbitmq123"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setVirtualHost("/"); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //獲取一個隨機的佇列名稱 String queueName = channel.queueDeclare().getQueue(); // 繫結routingKey "#" 表示對所有佇列感興趣 ,相當於fanout // 繫結routingKey 若不含"#"和"*",表示只對精確匹配的路由key佇列感興趣,則相對於routing // channel.queueBind(queue, exchange, routingKey) channel.queueBind(queueName, EXCHANGE_NAME, "*.black.*"); System.out.println("TopicLogReceiverA Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println( "TopicLogReceiverA接收到訊息:" + message ); } }; channel.basicConsume(queueName, true, consumer); } }
消費者B
package com.tingcream.rabbitmq.topic;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class TopicLogReceiverB {
private static String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//主機 埠 vhost 使用者名稱 密碼
factory.setHost("192.168.9.102");
factory.setUsername("rabbitmq");
factory.setPassword("rabbitmq123");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setVirtualHost("/");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//獲取一個隨機的佇列名稱
String queueName = channel.queueDeclare().getQueue();
// channel.queueBind(queue, exchange, routingKey) //bind方法可以呼叫多次,繫結多個
channel.queueBind(queueName, EXCHANGE_NAME, "quick.#");
channel.queueBind(queueName, EXCHANGE_NAME, "*.white.*");
System.out.println("TopicLogReceiverB Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println( "TopicLogReceiverB接收到訊息:" + message );
}
};
channel.basicConsume(queueName, true, consumer);
}
}
依次啟動消費者A、B和生產者。
消費者A輸出
TopicLogReceiverA Waiting for messages
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 0
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 1
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 2
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 3
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 4
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 5
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 6
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 7
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 8
TopicLogReceiverA接收到訊息:你好,這是lazy.black.elephant級別訊息 9
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 0
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 1
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 2
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 3
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 4
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 5
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 6
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 7
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 8
TopicLogReceiverA接收到訊息:你好,這是quick.black.horse級別訊息 9
消費者B輸出:
TopicLogReceiverB Waiting for messages
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 0
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 1
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 2
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 3
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 4
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 5
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 6
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 7
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 8
TopicLogReceiverB接收到訊息:你好,這是lazy.white.sheep級別訊息 9
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 0
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 1
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 2
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 3
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 4
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 5
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 6
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 7
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 8
TopicLogReceiverB接收到訊息:你好,這是quick.black.horse級別訊息 9
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 0
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 1
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 2
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 3
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 4
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 5
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 6
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 7
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 8
TopicLogReceiverB接收到訊息:你好,這是quick.red.horse級別訊息 9
管理後臺中 exchange標籤項中多出了一個topic交換機,名叫 topic_logs ,這就是我們剛宣告的topic型別交換機。