1. 程式人生 > 其它 >RabbitMq: 主題交換機的使用(Topic Exchange)

RabbitMq: 主題交換機的使用(Topic Exchange)

主題交換機,這個交換機其實跟直連交換機流程差不多,但是它的特點就是在它的路由鍵和繫結鍵之間是有規則的。
簡單地介紹下規則:

* (星號) 用來表示一個單詞 (必須出現的)
# (井號) 用來表示任意數量(零個或多個)單詞

通配的繫結鍵是跟佇列進行繫結的,例:

佇列Q1 繫結鍵為 .TT.
佇列Q2繫結鍵為 TT.#

如果一條訊息攜帶的路由鍵為 A.TT.B,那麼佇列Q1將會收到;
如果一條訊息攜帶的路由鍵為TT.AA.BB,那麼佇列Q2將會收到;

當一個佇列的繫結鍵為 "#"(井號) 的時候,這個佇列將會無視訊息的路由鍵,接收所有的訊息。
當 * (星號) 和 # (井號) 這兩個特殊字元都未在繫結鍵中出現的時候,此時主題交換機就擁有的直連交換機的行為。
所以主題交換機也就實現了扇形交換機的功能,和直連交換機的功能。

實現案例

先實現一個配置類,定義了兩個不同的佇列和一個交換機進行繫結

package com.example.demo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author lyd
 * @Description: 主題交換機
 * @date 14:11
 */
@Configuration
public class TopicRabbitConfig {

	// 佇列名繫結鍵
	public static final String QUEUE_MAN = "topic.man";
	public static final String QUEUE_WOMAN = "topic.woman";

	// 交換機繫結建
	public static final String TOPIC_EXANGE = "topicExchange";

	/**
	 * man佇列
	 *
	 * @return
	 */
	@Bean
	public Queue manQueue() {
		return new Queue(QUEUE_MAN,true);
	}

	/**
	 * woman佇列
	 *
	 * @return
	 */
	@Bean
	public Queue womanQueue() {
		return new Queue(QUEUE_WOMAN,true);
	}

	/**
	 * 交換機
	 *
	 * @return
	 */
	@Bean
	public TopicExchange exchange() {
		return new TopicExchange(TOPIC_EXANGE);
	}

	/**
	 * 將man佇列和交換機繫結,並指定匹配鍵關鍵字為topic.man,這樣只要是訊息攜帶的路由鍵是topic.man,才會分發到該佇列
	 *
	 * @return
	 */
	@Bean
	Binding bindingExchangeMessageMan() {
		return BindingBuilder.bind(manQueue()).to(exchange()).with(QUEUE_MAN);
	}

	/**
	 * 將woman佇列和交換機繫結,並指定匹配鍵關鍵字為topic.woman,這樣只要是訊息攜帶的路由鍵是topic.woman,才會分發到該佇列
	 *
	 * @return
	 */
	@Bean
	Binding bindingExchangeMessageWoman() {
		return BindingBuilder.bind(womanQueue()).to(exchange()).with(QUEUE_WOMAN);
	}

}

定義兩個介面,分別給兩個佇列傳送訊息

package com.example.demo.controller;

import com.example.demo.config.DirectRabbitConfig;
import com.example.demo.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @author lyd
 * @Description:
 * @date 11:29
 */
@RestController
public class ConvertMessController {

	@Autowired
	RabbitTemplate rabbitTemplate;

	@RequestMapping("sendManMessage")
	@ResponseBody
	public String sendManMessage(){

		String messageId = String.valueOf(UUID.randomUUID());
		String messageData = "我是男人";

		// 將要傳送的訊息放進Map型別中
		Map<String,Object> map=new HashMap<>();
		map.put("messageId",messageId);
		map.put("messageData",messageData);

		rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXANGE,TopicRabbitConfig.QUEUE_MAN,map);
		return "ok";
	}

	@RequestMapping("sendWomanMessage")
	@ResponseBody
	public String sendWomanMessage(){

		String messageId = String.valueOf(UUID.randomUUID());
		String messageData = "我是女人";

		// 將要傳送的訊息放進Map型別中
		Map<String,Object> map=new HashMap<>();
		map.put("messageId",messageId);
		map.put("messageData",messageData);

		rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXANGE,TopicRabbitConfig.QUEUE_WOMAN,map);
		return "ok";
	}


}

監聽兩個佇列,接收訊息

package com.example.demo.controller;

import com.example.demo.config.TopicRabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


/**
 * @author lyd
 * @Description:
 * @date 14:26
 */
@Component
public class TopicReceiver {

	@RabbitListener(queues = TopicRabbitConfig.QUEUE_MAN)
	public void processMan(Message message){
		System.out.println("男消費者接收到的訊息:"+message);
	}

	@RabbitListener(queues = TopicRabbitConfig.QUEUE_WOMAN)
	public void processWoman(Message message){
		System.out.println("女消費者接收到的訊息:"+message);
	}


}

yml配置檔案

server:
  port: 8080

spring:
  application:
    name: rabbitmq-provider
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    port: 5672
    virtual-host: /

呼叫介面 http://127.0.0.1:8080/sendManMessage ,接收到訊息

呼叫介面 http://127.0.0.1:8080/sendWomanMessage ,接收到訊息