1. 程式人生 > 其它 >RabbitMQ三種Exchange模式詳解

RabbitMQ三種Exchange模式詳解

技術標籤:rabbitmqrabbitmq

文章目錄


前言

本文主要介紹RabbitMQ的三種Exchange模式,分別為Direct ,Topic ,Fanout 。


一、Direct

傳送到該Exchange上的訊息,會按照routing key路由到指定的Queue。該routing key必須和binding key完全匹配。

示例

Direct
可以看到,direct exchange X綁定了兩個Queue,其中,Q1由binding key ‘orange’繫結,Q2有兩個binding key,分別為‘black’和‘green’。

在這樣的設定中,將使用routing key ‘orange’將要釋出到交換機的訊息 路由到佇列Q1。routing key 為‘black’或‘green’的訊息將轉到Q2。所有其他訊息將被丟棄。

示例程式碼

先引入jar包

<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>3.6.5</version>
</dependency>	

消費端

// 通過ConnectionFactory 建立一個Connection 
ConnectionFactory connectionFactory = new ConnectionFactory() ; // 配置連線的host connectionFactory.setHost("127.0.0.1"); // 配置連線的埠 connectionFactory.setPort(5672); // 配置連線的virtualhost connectionFactory.setVirtualHost("/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.
setNetworkRecoveryInterval(3000); // 建立Connection Connection connection = connectionFactory.newConnection(); // 建立Channel Channel channel = connection.createChannel(); // 宣告 String exchangeName = "exchange"; String exchangeType = "direct"; String queueName = "queue"; String routingKey = "routingKey"; // 宣告Exchange channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); // 宣告佇列 channel.queueDeclare(queueName, false, false, false, null); // 繫結Exchange和佇列 channel.queueBind(queueName, exchangeName, routingKey); // 宣告消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 引數:佇列名稱、是否自動ACK、Consumer channel.basicConsume(queueName, true, consumer); // 迴圈獲取訊息 while(true){ //獲取訊息,如果沒有訊息,這一步將會一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到訊息:" + msg); }

生產端

//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 建立Connection
Connection connection = connectionFactory.newConnection();
//3 建立Channel
Channel channel = connection.createChannel();  
//4 宣告
String exchangeName = "exchange";
String routingKey = "routingKey";
//5 傳送
String msg = "RabbitMQ Direct Exchange Message ... ";
channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());

channel.close();  
connection.close();  

注意:Direct模式可以使用RabbitMQ自帶的Exchange:defaut exchange,可以不將Exchang進行任何的繫結(binding),訊息傳遞時,routing key必須和Queue名稱完全匹配, 訊息才會被路由,反之訊息將被丟棄

二、Topic

傳送到topic exchange的訊息的 routing key,必須是以點分隔的單詞列表。示例:
“ stock.usd.nyse ”,“ nyse.vmw ”,“quick.orange.rabbit ”。
routing key中可以包含任意多個單詞,最多255個位元組。
binding key也必須採用相同的形式。topic exchange背後的邏輯 類似於direct exchange-用特定routing key傳送的訊息將傳遞到所有匹配binding key繫結的佇列。但是,binding key有兩個重要的特殊情況:

● *(星號)可以代替一個單詞。
● #(雜湊)可以替代零個或多個單詞。

示例

topic
在此示例中,我們建立了三個繫結:Q1與繫結鍵“ * .orange.* ”繫結,Q2與“ *.*.rabbit ”和“ lazy.# ”繫結。

其中:
routing key設定為“ quick.orange.rabbit ”的訊息將傳遞到兩個佇列。訊息“ lazy.orange.elephant ”也將傳送給他們兩個。
而“ quick.orange.fox ”只會進入第一個佇列,
“ lazy.brown.fox ”只會進入第二個佇列。
“ lazy.pink.rabbit ”將被傳遞到第二佇列一次,即使有兩個繫結匹配。
“ quick.brown.fox ”與任何繫結都不匹配,因此將被丟棄。

如果我們違約併發送一個或四個單詞的訊息,例如“orange”或“ quick.orange.male.rabbit ”,會發生什麼?好吧,這些訊息將不匹配任何繫結,並且將會丟失。

另一方面,“ lazy.orange.male.rabbit ”即使有四個單詞,也將匹配最後一個繫結,並將其傳送到第二個佇列。

程式碼示例

消費端

ConnectionFactory connectionFactory = new ConnectionFactory() ;  
        
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();  
//4 宣告
String exchangeName = "exchange";
String exchangeType = "topic";
String queueName = "queue";
//String routingKey = "user.*";
String routingKey = "user.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

QueueingConsumer consumer = new QueueingConsumer(channel);
//	引數:佇列名稱、是否自動ACK、Consumer
channel.basicConsume(queueName, true, consumer);

//	迴圈獲取訊息  
while(true){  
    //	獲取訊息,如果沒有訊息,這一步將會一直阻塞  
    Delivery delivery = consumer.nextDelivery();  
    String msg = new String(delivery.getBody());    
    System.out.println("收到訊息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());  
} 

生產端

//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 建立Connection
Connection connection = connectionFactory.newConnection();
//3 建立Channel
Channel channel = connection.createChannel();  
//4 宣告
String exchangeName = "exchange";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 傳送
String msg = "RabbitMQ Topic Exchange Message ...";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
//channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes()); 	
//channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
channel.close();  
connection.close();  

三、Fanout

傳送到fanout exchange的所有訊息都將廣播到它所繫結的(binding)所有佇列中,因此routing key在此將不起作用。

示例

fanout
所有傳送到exchange X的訊息都會廣播到與其繫結的兩個佇列中,且routing key在此將被忽略。

總結

以上三種模式就是最常使用的三種Exchange 模式。