1. 程式人生 > 實用技巧 >三、RabbitMQ的交換機型別

三、RabbitMQ的交換機型別

四種類型:

1.FanoutExchange(扇型交換機)-全部路由
廣播模式或者訂閱模式,可以同時繫結多個queue, 傳送訊息時,無需指定Routing Key
適用場景:廣播,群聊,新聞推送
2.DirectExchange(直連交換機)- 根據Routing Key路由
直連,通過Routing Key繫結queue,當傳送訊息到交換機時,會根據配置的Routing Key路由到不同的queue中
當N個queue的Routing Key相同時,訊息會被同時路由到這N個queue中去
3.HeadersExchange(頭交換機)- 根據header的匹配規則路由
設定交換機的匹配header的規則,支援單個精確匹配where,部分匹配whereAny,全部匹配whereAll,根據匹配的結果路由到相應的queue中

4.TopicExchange(主題交換機)
主題交換機通過有匹配規則的路由鍵和佇列繫結,*.a.*,#.b.#,*代表匹配任意一個單詞,#代表匹配任意一個或多個單詞
傳送訊息時,設定路由鍵,如設定為l.a.m,則會被路由到繫結*.a.*的佇列去,設定為l.b則會被路由到繫結#.b.#的佇列去,設定為l.a.b.c會同時到兩個佇列中,設定l.m則會被丟棄

PS:CustomExchange不是一種固定的型別,是用來配合外掛一起使用的,具體參考上篇https://www.cnblogs.com/Hleaves/p/13594278.html

測試:

//初始化交換機和佇列資訊

import org.springframework.amqp.core.*;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class MQExchangeConfig { public static
final String DIRECT_EXCHANGE_NAME = "direct-Exchange"; public static final String FANOUT_EXCHANGE_NAME = "fanout-Exchange"; public static final String CUSTOM_EXCHANGE_NAME = "custom-Exchange"; public static final String HEADERS_EXCHANGE_NAME = "headers-Exchange"; public static final String TOPIC_EXCHANGE_NAME = "topic-Exchange"; public static final String QUEUEA_NAME = "queueA"; public static final String QUEUEB_NAME = "queueB"; public static final String ROUTING_KEY_A_NAME = "routingKeyA"; public static final String ROUTING_KEY_B_NAME = "routingKeyB"; @Bean DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE_NAME); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE_NAME); } @Bean CustomExchange customExchange() { Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); return new CustomExchange(CUSTOM_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean HeadersExchange headersExchange() { Map<String, Object> args = new HashMap<>(); args.put("HeaderA", "aaa"); args.put("HeaderB", "bbb"); return (HeadersExchange) ExchangeBuilder.headersExchange(HEADERS_EXCHANGE_NAME).withArguments(args).build(); } @Bean TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE_NAME); } @Bean Queue queueA() { return new Queue(QUEUEA_NAME); } @Bean Queue queueB() { return new Queue(QUEUEB_NAME); } @Bean Binding bindingAD(Queue queueA, DirectExchange directExchange) { return BindingBuilder.bind(queueA).to(directExchange).with(ROUTING_KEY_A_NAME); } @Bean Binding bindingAF(Queue queueA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueA).to(fanoutExchange); } @Bean Binding bindingAC(Queue queueA, CustomExchange customExchange) { return BindingBuilder.bind(queueA).to(customExchange).with(ROUTING_KEY_A_NAME).noargs(); } @Bean Binding bindingAH(Queue queueA, HeadersExchange headersExchange) { //精確匹配 // return BindingBuilder.bind(queueA).to(headersExchange).where("Header").matches("ccc"); //部分匹配 return BindingBuilder.bind(queueA).to(headersExchange).whereAny("HeaderA", "HeaderB").exist(); } @Bean Binding bindingAT(Queue queueA, TopicExchange topicExchange) { return BindingBuilder.bind(queueA).to(topicExchange).with("*." + ROUTING_KEY_A_NAME); } @Bean Binding bindingBD(Queue queueB, DirectExchange directExchange) { return BindingBuilder.bind(queueB).to(directExchange).with(ROUTING_KEY_B_NAME); } @Bean Binding bindingBF(Queue queueB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueB).to(fanoutExchange); } @Bean Binding bindingBC(Queue queueB, CustomExchange customExchange) { return BindingBuilder.bind(queueB).to(customExchange).with(ROUTING_KEY_B_NAME).noargs(); } @Bean Binding bindingBH(Queue queueB, HeadersExchange headersExchange) { //全部匹配 return BindingBuilder.bind(queueB).to(headersExchange).whereAll("HeaderA", "HeaderB").exist(); } @Bean Binding bindingBT(Queue queueB, TopicExchange topicExchange) { return BindingBuilder.bind(queueB).to(topicExchange).with("#." + ROUTING_KEY_B_NAME); } //先初始化佇列 @Bean @ConditionalOnBean(Queue.class) MQExchangeConsumer mqExchangeConsumer() { return new MQExchangeConsumer(); } }

//傳送訊息

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import static com.mhou.rabbitmq.exchange.MQExchangeConfig.*;

@RestController
@Slf4j
public class MQExchangeSender {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendDiffExchange")
    public void sendDiffExchange(String exchangeType, String msg) {
        log.info("msg-{}", msg);
        switch (exchangeType) {
            case "f":
                //routingkey的指定沒有實際意義,可以為空
                rabbitTemplate.convertSendAndReceive(FANOUT_EXCHANGE_NAME, "", msg + "a");
                rabbitTemplate.convertSendAndReceive(FANOUT_EXCHANGE_NAME, "", msg + "b");
                break;
            case "c":
                rabbitTemplate.convertSendAndReceive(CUSTOM_EXCHANGE_NAME, ROUTING_KEY_A_NAME, msg + "a", message -> {
                    message.getMessageProperties().setDelay(10000);
                    return message;
                });
                rabbitTemplate.convertSendAndReceive(CUSTOM_EXCHANGE_NAME, ROUTING_KEY_B_NAME, msg + "b", message -> {
                    message.getMessageProperties().setHeader("x-delay", 20000);
                    return message;
                });
                break;
            case "h":
                rabbitTemplate.convertSendAndReceive(HEADERS_EXCHANGE_NAME, "", msg + "a", message -> {
                    message.getMessageProperties().setHeader("HeaderA", "aaa");
                    return message;
                });
                rabbitTemplate.convertSendAndReceive(HEADERS_EXCHANGE_NAME, "", msg + "b", message -> {
                    message.getMessageProperties().setHeader("HeaderA", "aaa");
                    message.getMessageProperties().setHeader("HeaderB", "bbb");
                    return message;
                });
                break;
            case "t":
                rabbitTemplate.convertSendAndReceive(TOPIC_EXCHANGE_NAME, ROUTING_KEY_A_NAME + "." + ROUTING_KEY_B_NAME, msg + "a");
                rabbitTemplate.convertSendAndReceive(TOPIC_EXCHANGE_NAME, ROUTING_KEY_A_NAME + "." + ROUTING_KEY_B_NAME, msg + "b");
                break;
            default:
                rabbitTemplate.convertSendAndReceive(DIRECT_EXCHANGE_NAME, ROUTING_KEY_A_NAME, msg + "a");
                rabbitTemplate.convertSendAndReceive(DIRECT_EXCHANGE_NAME, ROUTING_KEY_B_NAME, msg + "b");
        }
    }
}

//結果

1.FanoutExchange ,傳送到交換機的訊息可以同時被路由到AB佇列

2.HeadersExchange, 訊息頭部含有HeaderA或者HeaderB的訊息會被路由到A佇列,同時含有HeaderA和HeaderB的訊息會被路由到B佇列

3.TopicExchange, 路由鍵routingKeyA.routingKeyB 匹配 #.routingKeyB,不匹配*.routingKeyA ,因此兩條訊息都會被路由到B佇列

4.DirectExchange,直接根據設定的路由鍵繫結佇列