RabbitMQ三種Exchange模式詳解
文章目錄
前言
本文主要介紹RabbitMQ的三種Exchange模式,分別為Direct ,Topic ,Fanout 。
一、Direct
傳送到該Exchange上的訊息,會按照routing key路由到指定的Queue。該routing key必須和binding key完全匹配。
示例
可以看到,direct exchange X綁定了兩個Queue,其中,Q1由binding key ‘orange’繫結,Q2有兩個binding key,分別為‘black’和‘green’。
示例程式碼
先引入jar包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
消費端
// 通過ConnectionFactory 建立一個Connection
ConnectionFactory connectionFactory = new ConnectionFactory() ;
// 配置連線的host
connectionFactory.setHost("127.0.0.1");
// 配置連線的埠
connectionFactory.setPort(5672);
// 配置連線的virtualhost
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory. setNetworkRecoveryInterval(3000);
// 建立Connection
Connection connection = connectionFactory.newConnection();
// 建立Channel
Channel channel = connection.createChannel();
// 宣告
String exchangeName = "exchange";
String exchangeType = "direct";
String queueName = "queue";
String routingKey = "routingKey";
// 宣告Exchange
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 宣告佇列
channel.queueDeclare(queueName, false, false, false, null);
// 繫結Exchange和佇列
channel.queueBind(queueName, exchangeName, routingKey);
// 宣告消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 引數:佇列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
// 迴圈獲取訊息
while(true){
//獲取訊息,如果沒有訊息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到訊息:" + msg);
}
生產端
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 建立Connection
Connection connection = connectionFactory.newConnection();
//3 建立Channel
Channel channel = connection.createChannel();
//4 宣告
String exchangeName = "exchange";
String routingKey = "routingKey";
//5 傳送
String msg = "RabbitMQ Direct Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
channel.close();
connection.close();
注意:Direct模式可以使用RabbitMQ自帶的Exchange:defaut exchange,可以不將Exchang進行任何的繫結(binding),訊息傳遞時,routing key必須和Queue名稱完全匹配, 訊息才會被路由,反之訊息將被丟棄
二、Topic
傳送到topic exchange的訊息的 routing key,必須是以點分隔的單詞列表。示例:
“ stock.usd.nyse ”,“ nyse.vmw ”,“quick.orange.rabbit ”。
routing key中可以包含任意多個單詞,最多255個位元組。
binding key也必須採用相同的形式。topic exchange背後的邏輯 類似於direct exchange-用特定routing key傳送的訊息將傳遞到所有匹配binding key繫結的佇列。但是,binding key有兩個重要的特殊情況:
● *(星號)可以代替一個單詞。
● #(雜湊)可以替代零個或多個單詞。
示例
在此示例中,我們建立了三個繫結:Q1與繫結鍵“ * .orange.* ”繫結,Q2與“ *.*.rabbit ”和“ lazy.# ”繫結。
其中:
routing key設定為“ quick.orange.rabbit ”的訊息將傳遞到兩個佇列。訊息“ lazy.orange.elephant ”也將傳送給他們兩個。
而“ quick.orange.fox ”只會進入第一個佇列,
“ lazy.brown.fox ”只會進入第二個佇列。
“ lazy.pink.rabbit ”將被傳遞到第二佇列一次,即使有兩個繫結匹配。
“ quick.brown.fox ”與任何繫結都不匹配,因此將被丟棄。
如果我們違約併發送一個或四個單詞的訊息,例如“orange”或“ quick.orange.male.rabbit ”,會發生什麼?好吧,這些訊息將不匹配任何繫結,並且將會丟失。
另一方面,“ lazy.orange.male.rabbit ”即使有四個單詞,也將匹配最後一個繫結,並將其傳送到第二個佇列。
程式碼示例
消費端
ConnectionFactory connectionFactory = new ConnectionFactory() ;
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//4 宣告
String exchangeName = "exchange";
String exchangeType = "topic";
String queueName = "queue";
//String routingKey = "user.*";
String routingKey = "user.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 引數:佇列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);
// 迴圈獲取訊息
while(true){
// 獲取訊息,如果沒有訊息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("收到訊息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());
}
生產端
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 建立Connection
Connection connection = connectionFactory.newConnection();
//3 建立Channel
Channel channel = connection.createChannel();
//4 宣告
String exchangeName = "exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 傳送
String msg = "RabbitMQ Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
//channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
//channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
channel.close();
connection.close();
三、Fanout
傳送到fanout exchange的所有訊息都將廣播到它所繫結的(binding)所有佇列中,因此routing key在此將不起作用。
示例
所有傳送到exchange X的訊息都會廣播到與其繫結的兩個佇列中,且routing key在此將被忽略。
總結
以上三種模式就是最常使用的三種Exchange 模式。