RabbitMq: 主題交換機的使用(Topic Exchange)
阿新 • • 發佈:2021-06-25
主題交換機,這個交換機其實跟直連交換機流程差不多,但是它的特點就是在它的路由鍵和繫結鍵之間是有規則的。
簡單地介紹下規則:
* (星號) 用來表示一個單詞 (必須出現的)
# (井號) 用來表示任意數量(零個或多個)單詞
通配的繫結鍵是跟佇列進行繫結的,例:
佇列Q1 繫結鍵為 .TT.
佇列Q2繫結鍵為 TT.#
如果一條訊息攜帶的路由鍵為 A.TT.B,那麼佇列Q1將會收到;
如果一條訊息攜帶的路由鍵為TT.AA.BB,那麼佇列Q2將會收到;
當一個佇列的繫結鍵為 "#"(井號) 的時候,這個佇列將會無視訊息的路由鍵,接收所有的訊息。
當 * (星號) 和 # (井號) 這兩個特殊字元都未在繫結鍵中出現的時候,此時主題交換機就擁有的直連交換機的行為。
所以主題交換機也就實現了扇形交換機的功能,和直連交換機的功能。
實現案例
先實現一個配置類,定義了兩個不同的佇列和一個交換機進行繫結
package com.example.demo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author lyd * @Description: 主題交換機 * @date 14:11 */ @Configuration public class TopicRabbitConfig { // 佇列名繫結鍵 public static final String QUEUE_MAN = "topic.man"; public static final String QUEUE_WOMAN = "topic.woman"; // 交換機繫結建 public static final String TOPIC_EXANGE = "topicExchange"; /** * man佇列 * * @return */ @Bean public Queue manQueue() { return new Queue(QUEUE_MAN,true); } /** * woman佇列 * * @return */ @Bean public Queue womanQueue() { return new Queue(QUEUE_WOMAN,true); } /** * 交換機 * * @return */ @Bean public TopicExchange exchange() { return new TopicExchange(TOPIC_EXANGE); } /** * 將man佇列和交換機繫結,並指定匹配鍵關鍵字為topic.man,這樣只要是訊息攜帶的路由鍵是topic.man,才會分發到該佇列 * * @return */ @Bean Binding bindingExchangeMessageMan() { return BindingBuilder.bind(manQueue()).to(exchange()).with(QUEUE_MAN); } /** * 將woman佇列和交換機繫結,並指定匹配鍵關鍵字為topic.woman,這樣只要是訊息攜帶的路由鍵是topic.woman,才會分發到該佇列 * * @return */ @Bean Binding bindingExchangeMessageWoman() { return BindingBuilder.bind(womanQueue()).to(exchange()).with(QUEUE_WOMAN); } }
定義兩個介面,分別給兩個佇列傳送訊息
package com.example.demo.controller; import com.example.demo.config.DirectRabbitConfig; import com.example.demo.config.TopicRabbitConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * @author lyd * @Description: * @date 11:29 */ @RestController public class ConvertMessController { @Autowired RabbitTemplate rabbitTemplate; @RequestMapping("sendManMessage") @ResponseBody public String sendManMessage(){ String messageId = String.valueOf(UUID.randomUUID()); String messageData = "我是男人"; // 將要傳送的訊息放進Map型別中 Map<String,Object> map=new HashMap<>(); map.put("messageId",messageId); map.put("messageData",messageData); rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXANGE,TopicRabbitConfig.QUEUE_MAN,map); return "ok"; } @RequestMapping("sendWomanMessage") @ResponseBody public String sendWomanMessage(){ String messageId = String.valueOf(UUID.randomUUID()); String messageData = "我是女人"; // 將要傳送的訊息放進Map型別中 Map<String,Object> map=new HashMap<>(); map.put("messageId",messageId); map.put("messageData",messageData); rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXANGE,TopicRabbitConfig.QUEUE_WOMAN,map); return "ok"; } }
監聽兩個佇列,接收訊息
package com.example.demo.controller;
import com.example.demo.config.TopicRabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author lyd
* @Description:
* @date 14:26
*/
@Component
public class TopicReceiver {
@RabbitListener(queues = TopicRabbitConfig.QUEUE_MAN)
public void processMan(Message message){
System.out.println("男消費者接收到的訊息:"+message);
}
@RabbitListener(queues = TopicRabbitConfig.QUEUE_WOMAN)
public void processWoman(Message message){
System.out.println("女消費者接收到的訊息:"+message);
}
}
yml配置檔案
server:
port: 8080
spring:
application:
name: rabbitmq-provider
rabbitmq:
host: localhost
username: guest
password: guest
port: 5672
virtual-host: /
呼叫介面 http://127.0.0.1:8080/sendManMessage ,接收到訊息
呼叫介面 http://127.0.0.1:8080/sendWomanMessage ,接收到訊息