RabbitMq學習(二)DirectExchange在springboot的用法
依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.xquant</groupId> <artifactId>xquant-rabbitmq-send</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>xquant-rabbitmq-send</name> <description>xquant-rabbitmq-send</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
配置
在這個配置中,分以下幾步: 第一步:構建交換機DirectExchange 第二步:構建佇列Queue,這裡我定義了兩個:test1,test2 第三步:繫結交換機和佇列,這裡我用direct_key1這個bingdingKey來繫結DirectExchange和test1,用direct_key2這個bingdingKey來繫結DirectExchange和test2 類似於下圖: 第四步:構建ConnectionFactory,配置連結和使用者名稱密碼,例項化RabbitTemplate 程式碼如下:
package com.xquant.rabbitmq.send.mq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author chunhui.tan * @version 建立時間:2018年11月14日 下午1:29:21 * */ @Configuration public class DirectMqConfig { /** * 交換機名稱 */ public static final String DIRECT_EXCHANGE_NAME = "direct_exchange"; /** * 繫結key,交換機繫結佇列時需要指定 */ public static final String BINGDING_KEY_TEST1 = "direct_key1"; public static final String BINGDING_KEY_TEST2 = "direct_key2"; /** * 佇列名稱 */ public static final String QUEUE_TEST1 = "test1"; public static final String QUEUE_TEST2 = "test2"; /** * 構建DirectExchange交換機 * * @return */ @Bean public DirectExchange directExchange() { // 支援持久化,長期不用補刪除 return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false); } /** * 構建序列 * * @return */ @Bean public Queue test1Queue() { // 支援持久化 return new Queue(QUEUE_TEST1, true); } @Bean public Queue test2Queue() { // 支援持久化 return new Queue(QUEUE_TEST2, true); } /** * 繫結交交換機和 * * @return */ @Bean public Binding test1Binding() { return BindingBuilder.bind(test1Queue()).to(directExchange()).with(BINGDING_KEY_TEST1); } @Bean public Binding test2Binding() { return BindingBuilder.bind(test2Queue()).to(directExchange()).with(BINGDING_KEY_TEST2); } /** * 配置 * * @return */ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123456"); return connectionFactory; } /** * 例項化操作模板 * * @param connectionFactory * @return */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } }
生產者
為了方便測試,寫一個介面,可以看出在傳送訊息時需要指定交換機和路由key。 第一個介面傳送給佇列test1 第二個介面傳送給佇列test2
package com.xquant.rabbitmq.send.mq; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * @author chunhui.tan * @version 建立時間:2018年11月14日 下午4:17:31 * */ @RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; /** * 給test1佇列發訊息 * * @return */ @GetMapping("/sendMessage1") public Object sendMessage1() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentEncoding("UTF-8"); Message message = new Message("你好,這是發給test1佇列的訊息".getBytes(), messageProperties); rabbitTemplate.send(DirectMqConfig.DIRECT_EXCHANGE_NAME, DirectMqConfig.BINGDING_KEY_TEST1, message); return "ok"; } /** * 給test2佇列發訊息 * * @return */ @GetMapping("/sendMessage2") public Object sendMessage2() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentEncoding("UTF-8"); Message message = new Message("你好,這是發給test2佇列的訊息".getBytes(), messageProperties); rabbitTemplate.send(DirectMqConfig.DIRECT_EXCHANGE_NAME, DirectMqConfig.BINGDING_KEY_TEST2, message); return "ok"; } }
消費者
package com.xquant.rabbitmq.send.mq;
import java.io.UnsupportedEncodingException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author chunhui.tan
* @version 建立時間:2018年11月14日 下午4:21:24
*
*/
@Component
public class Consumer {
/**
* 監聽test1佇列
*
* @param message
* @throws UnsupportedEncodingException
*/
@RabbitListener(queues = DirectMqConfig.QUEUE_TEST1)
public void consumeMessage1(Message message) throws UnsupportedEncodingException {
System.out.println("這是監聽test1得到的訊息:======" + new String(message.getBody(), "utf-8"));
}
/**
* 監聽test2佇列
*
* @param message
* @throws UnsupportedEncodingException
*/
@RabbitListener(queues = DirectMqConfig.QUEUE_TEST2)
public void consumeMessage2(Message message) throws UnsupportedEncodingException {
System.out.println("這是監聽test1得到的訊息:======" + new String(message.getBody(), "utf-8"));
}
}
啟動專案測試
多個繫結情況
以上演示的是一個bingdingKey繫結一個佇列的情況,其實我們可以用一個bingdingKey繫結多個佇列。 配置程式碼如下:
package com.xquant.rabbitmq.send.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author chunhui.tan
* @version 建立時間:2018年11月14日 下午1:29:21
*
*/
@Configuration
public class DirectMqConfig {
/**
* 交換機名稱
*/
public static final String DIRECT_EXCHANGE_NAME = "direct_exchange";
/**
* 繫結key,交換機繫結佇列時需要指定
*/
public static final String BINGDING_KEY_TEST1 = "direct_key1";
public static final String BINGDING_KEY_TEST2 = "direct_key2";
/**
* 佇列名稱
*/
public static final String QUEUE_TEST1 = "test1";
public static final String QUEUE_TEST2 = "test2";
/**
* 構建DirectExchange交換機
*
* @return
*/
@Bean
public DirectExchange directExchange() {
// 支援持久化,長期不用補刪除
return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
}
/**
* 構建序列
*
* @return
*/
@Bean
public Queue test1Queue() {
// 支援持久化
return new Queue(QUEUE_TEST1, true);
}
@Bean
public Queue test2Queue() {
// 支援持久化
return new Queue(QUEUE_TEST2, true);
}
/**
* 繫結交交換機和
*
* @return
*/
@Bean
public Binding test1Binding() {
return BindingBuilder.bind(test1Queue()).to(directExchange()).with(BINGDING_KEY_TEST1);
}
/**
* 修改這裡,將繫結的key變成BINGDING_KEY_TEST1,使兩個佇列通過同一個bindingkey繫結到交換機
*
* @return
*/
@Bean
public Binding test2Binding() {
return BindingBuilder.bind(test2Queue()).to(directExchange()).with(BINGDING_KEY_TEST1);
}
/**
* 配置
*
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("47.110.34.160", 5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
return connectionFactory;
}
/**
* 例項化操作模板
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}
變化很小,只是test2Binding()方法中將繫結的key變成BINGDING_KEY_TEST1,使兩個佇列通過同一個bindingkey繫結到交換機。 生產者介面還是不變 消費者部分程式碼也不變 先web頁面刪除交換機和佇列 然後啟動專案測試 檢視web頁面: 通過同一個bindingkey將兩個佇列繫結到同一個交換機 生產者介面中,sendMessage2()方法中是這樣的:
/**
* 給test2佇列發訊息
*
* @return
*/
@GetMapping("/sendMessage2")
public Object sendMessage2() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentEncoding("UTF-8");
Message message = new Message("你好,這是發給test2佇列的訊息".getBytes(), messageProperties);
rabbitTemplate.send(DirectMqConfig.DIRECT_EXCHANGE_NAME, DirectMqConfig.BINGDING_KEY_TEST2, message);
return "ok";
}
顯然這裡在發訊息時指定的路由key是direct_key2,所以這條訊息應該會丟失的。 我們訪問sendMessage1介面,控制檯列印如下: 可以看到,分別監聽test1和test2的監聽器都監聽到了通過y direct_key1這個路由key傳送的訊息。
訪問sendMessage2介面, 控制檯空空如也,原本通過direct_key2這個路由鍵傳送的訊息丟失了,因為沒有佇列通過這個路由鍵來繫結交換機。
總結
- Direct-直連交換機適合那種一對一的訊息傳遞,通過指定的bandingKey來繫結交換機和佇列
- 使用Direct型別交換機時,bandingKey和佇列時多對多的關係,即一個佇列可以通過多個bandingKey來繫結交換機,一個bandingKey也可以繫結多個佇列到交換機,
- 這裡只演示了一個交換機繫結多個佇列的情況,其實一個佇列也是可以被繫結到不同的交換機的。
- 使用Direct型別交換機時,傳送訊息時,必須指定交換機和routingKey,RabbitMq會通過routingKey去匹配bandingKey,然後找到相應的佇列,找到就傳送訊息給這個佇列,找到多個就分別發訊息給找到的佇列,若找不到,訊息則丟失
- 在配置檔案中,我習慣將繫結佇列和交換機的key叫做bindingKey,在生產者傳送訊息時,引數值填的key叫做routingKey,其實這兩個東西都是為了匹配訊息和佇列使用的,並無實質差別。