1. 程式人生 > >RabbitMq學習(二)DirectExchange在springboot的用法

RabbitMq學習(二)DirectExchange在springboot的用法

依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.xquant</groupId>
	<artifactId>xquant-rabbitmq-send</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>xquant-rabbitmq-send</name>
	<description>xquant-rabbitmq-send</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.5.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>


</project>

配置

在這個配置中,分以下幾步: 第一步:構建交換機DirectExchange 第二步:構建佇列Queue,這裡我定義了兩個:test1,test2 第三步:繫結交換機和佇列,這裡我用direct_key1這個bingdingKey來繫結DirectExchange和test1,用direct_key2這個bingdingKey來繫結DirectExchange和test2 類似於下圖: 在這裡插入圖片描述 第四步:構建ConnectionFactory,配置連結和使用者名稱密碼,例項化RabbitTemplate 程式碼如下:

package com.xquant.rabbitmq.send.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author chunhui.tan
 * @version 建立時間:2018年11月14日 下午1:29:21
 *
 */
@Configuration
public class DirectMqConfig {

	/**
	 * 交換機名稱
	 */
	public static final String DIRECT_EXCHANGE_NAME = "direct_exchange";

	/**
	 * 繫結key,交換機繫結佇列時需要指定
	 */
	public static final String BINGDING_KEY_TEST1 = "direct_key1";
	public static final String BINGDING_KEY_TEST2 = "direct_key2";

	/**
	 * 佇列名稱
	 */
	public static final String QUEUE_TEST1 = "test1";
	public static final String QUEUE_TEST2 = "test2";

	/**
	 * 構建DirectExchange交換機
	 * 
	 * @return
	 */
	@Bean
	public DirectExchange directExchange() {
		// 支援持久化,長期不用補刪除
		return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
	}

	/**
	 * 構建序列
	 * 
	 * @return
	 */
	@Bean
	public Queue test1Queue() {
		// 支援持久化
		return new Queue(QUEUE_TEST1, true);
	}

	@Bean
	public Queue test2Queue() {
		// 支援持久化
		return new Queue(QUEUE_TEST2, true);
	}

	/**
	 * 繫結交交換機和
	 * 
	 * @return
	 */
	@Bean
	public Binding test1Binding() {
		return BindingBuilder.bind(test1Queue()).to(directExchange()).with(BINGDING_KEY_TEST1);
	}

	@Bean
	public Binding test2Binding() {
		return BindingBuilder.bind(test2Queue()).to(directExchange()).with(BINGDING_KEY_TEST2);
	}

	/**
	 * 配置
	 * 
	 * @return
	 */
	@Bean
	public ConnectionFactory connectionFactory() {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("123456");
		return connectionFactory;
	}

	/**
	 * 例項化操作模板
	 * 
	 * @param connectionFactory
	 * @return
	 */
	@Bean
	public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
		return new RabbitTemplate(connectionFactory);
	}

}

生產者

為了方便測試,寫一個介面,可以看出在傳送訊息時需要指定交換機和路由key。 第一個介面傳送給佇列test1 第二個介面傳送給佇列test2

package com.xquant.rabbitmq.send.mq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author chunhui.tan
 * @version 建立時間:2018年11月14日 下午4:17:31
 *
 */
@RestController
public class ProducerController {

	@Autowired
	private RabbitTemplate rabbitTemplate;

	/**
	 * 給test1佇列發訊息
	 * 
	 * @return
	 */
	@GetMapping("/sendMessage1")
	public Object sendMessage1() {
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentEncoding("UTF-8");
		Message message = new Message("你好,這是發給test1佇列的訊息".getBytes(), messageProperties);
		rabbitTemplate.send(DirectMqConfig.DIRECT_EXCHANGE_NAME, DirectMqConfig.BINGDING_KEY_TEST1, message);
		return "ok";
	}

	/**
	 * 給test2佇列發訊息
	 * 
	 * @return
	 */
	@GetMapping("/sendMessage2")
	public Object sendMessage2() {
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentEncoding("UTF-8");
		Message message = new Message("你好,這是發給test2佇列的訊息".getBytes(), messageProperties);
		rabbitTemplate.send(DirectMqConfig.DIRECT_EXCHANGE_NAME, DirectMqConfig.BINGDING_KEY_TEST2, message);
		return "ok";
	}

}

消費者

package com.xquant.rabbitmq.send.mq;

import java.io.UnsupportedEncodingException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author chunhui.tan
 * @version 建立時間:2018年11月14日 下午4:21:24
 *
 */
@Component
public class Consumer {

	/**
	 * 監聽test1佇列
	 * 
	 * @param message
	 * @throws UnsupportedEncodingException
	 */
	@RabbitListener(queues = DirectMqConfig.QUEUE_TEST1)
	public void consumeMessage1(Message message) throws UnsupportedEncodingException {
		System.out.println("這是監聽test1得到的訊息:======" + new String(message.getBody(), "utf-8"));
	}

	/**
	 * 監聽test2佇列
	 * 
	 * @param message
	 * @throws UnsupportedEncodingException
	 */
	@RabbitListener(queues = DirectMqConfig.QUEUE_TEST2)
	public void consumeMessage2(Message message) throws UnsupportedEncodingException {
		System.out.println("這是監聽test1得到的訊息:======" + new String(message.getBody(), "utf-8"));
	}

}

啟動專案測試

多個繫結情況

以上演示的是一個bingdingKey繫結一個佇列的情況,其實我們可以用一個bingdingKey繫結多個佇列。 配置程式碼如下:

package com.xquant.rabbitmq.send.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author chunhui.tan
 * @version 建立時間:2018年11月14日 下午1:29:21
 *
 */
@Configuration
public class DirectMqConfig {

	/**
	 * 交換機名稱
	 */
	public static final String DIRECT_EXCHANGE_NAME = "direct_exchange";

	/**
	 * 繫結key,交換機繫結佇列時需要指定
	 */
	public static final String BINGDING_KEY_TEST1 = "direct_key1";
	public static final String BINGDING_KEY_TEST2 = "direct_key2";

	/**
	 * 佇列名稱
	 */
	public static final String QUEUE_TEST1 = "test1";
	public static final String QUEUE_TEST2 = "test2";

	/**
	 * 構建DirectExchange交換機
	 * 
	 * @return
	 */
	@Bean
	public DirectExchange directExchange() {
		// 支援持久化,長期不用補刪除
		return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
	}

	/**
	 * 構建序列
	 * 
	 * @return
	 */
	@Bean
	public Queue test1Queue() {
		// 支援持久化
		return new Queue(QUEUE_TEST1, true);
	}

	@Bean
	public Queue test2Queue() {
		// 支援持久化
		return new Queue(QUEUE_TEST2, true);
	}

	/**
	 * 繫結交交換機和
	 * 
	 * @return
	 */
	@Bean
	public Binding test1Binding() {
		return BindingBuilder.bind(test1Queue()).to(directExchange()).with(BINGDING_KEY_TEST1);
	}
	/**
	 * 修改這裡,將繫結的key變成BINGDING_KEY_TEST1,使兩個佇列通過同一個bindingkey繫結到交換機
	 * 
	 * @return
	 */
	@Bean
	public Binding test2Binding() {
		return BindingBuilder.bind(test2Queue()).to(directExchange()).with(BINGDING_KEY_TEST1);
	}

	/**
	 * 配置
	 * 
	 * @return
	 */
	@Bean
	public ConnectionFactory connectionFactory() {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory("47.110.34.160", 5672);
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("123456");
		return connectionFactory;
	}

	/**
	 * 例項化操作模板
	 * 
	 * @param connectionFactory
	 * @return
	 */
	@Bean
	public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
		return new RabbitTemplate(connectionFactory);
	}

}

變化很小,只是test2Binding()方法中將繫結的key變成BINGDING_KEY_TEST1,使兩個佇列通過同一個bindingkey繫結到交換機。 生產者介面還是不變 消費者部分程式碼也不變 先web頁面刪除交換機和佇列 然後啟動專案測試 檢視web頁面: 在這裡插入圖片描述 通過同一個bindingkey將兩個佇列繫結到同一個交換機 生產者介面中,sendMessage2()方法中是這樣的:

/**
	 * 給test2佇列發訊息
	 * 
	 * @return
	 */
	@GetMapping("/sendMessage2")
	public Object sendMessage2() {
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentEncoding("UTF-8");
		Message message = new Message("你好,這是發給test2佇列的訊息".getBytes(), messageProperties);
		rabbitTemplate.send(DirectMqConfig.DIRECT_EXCHANGE_NAME, DirectMqConfig.BINGDING_KEY_TEST2, message);
		return "ok";
	}

顯然這裡在發訊息時指定的路由key是direct_key2,所以這條訊息應該會丟失的。 我們訪問sendMessage1介面,控制檯列印如下: 在這裡插入圖片描述 可以看到,分別監聽test1和test2的監聽器都監聽到了通過y direct_key1這個路由key傳送的訊息。

訪問sendMessage2介面, 在這裡插入圖片描述 控制檯空空如也,原本通過direct_key2這個路由鍵傳送的訊息丟失了,因為沒有佇列通過這個路由鍵來繫結交換機。

總結

  • Direct-直連交換機適合那種一對一的訊息傳遞,通過指定的bandingKey來繫結交換機和佇列
  • 使用Direct型別交換機時,bandingKey和佇列時多對多的關係,即一個佇列可以通過多個bandingKey來繫結交換機,一個bandingKey也可以繫結多個佇列到交換機,
  • 這裡只演示了一個交換機繫結多個佇列的情況,其實一個佇列也是可以被繫結到不同的交換機的。
  • 使用Direct型別交換機時,傳送訊息時,必須指定交換機和routingKey,RabbitMq會通過routingKey去匹配bandingKey,然後找到相應的佇列,找到就傳送訊息給這個佇列,找到多個就分別發訊息給找到的佇列,若找不到,訊息則丟失
  • 在配置檔案中,我習慣將繫結佇列和交換機的key叫做bindingKey,在生產者傳送訊息時,引數值填的key叫做routingKey,其實這兩個東西都是為了匹配訊息和佇列使用的,並無實質差別。