1. 程式人生 > 其它 >RabbitMQ 之 Exchange

RabbitMQ 之 Exchange

一、交換機相關概念

1、交換機的作用

RabbitMQ 在傳遞的訊息過程中,生產者和佇列之間是沒有直接聯絡的,生產者生產的訊息要推送到佇列中需要藉助於交換機,交換機就是生產者和佇列的中間橋樑.交換機的工作內容非常簡單,一方面它接收來自生產者的訊息,另外一方面是將訊息推送到佇列中.

2、交換機的型別

交換機在接收到生產者生產的訊息之後必須要知道該如何處理這些訊息,是將訊息推送到特定的佇列上還是說丟棄他們,訊息具體該如何處理就由交換機的型別來決定了,常用的交換機型別通常有 default、fanout、direct、headers、topic

宣告(AMQP default) 預設的交換機

// 第一個引數如果是空字串("")則代表使用預設交換機 (AMQP default)
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));

宣告 fanout、direct、headers、topic 型別的交換機

// 宣告一個 fanout 型別的交換機 exchange01
channel.exchangeDeclare("exchange01", BuiltinExchangeType.FANOUT);
// 宣告一個 direct 型別的交換機 exchange02
channel.exchangeDeclare("exchange02", BuiltinExchangeType.DIRECT);
// 宣告一個 headers 型別的交換機 exchange03
channel.exchangeDeclare("exchange03", BuiltinExchangeType.HEADERS);
// 宣告一個 topic 型別的交換機 exchange04
channel.exchangeDeclare("exchange04", BuiltinExchangeType.TOPIC);

雖然交換機的型別有很多,但是在實際的應用中 headers 不常用,所以我們下面主要看一下 fanout、direct、topic 的用法

二、fanout

fanout 這種型別非常簡單,它是將接收到的所有訊息廣播到它所知道的所有佇列中,交換機和佇列使用的 routingkey (binding key) 為空字串("")

1、原理圖

可以通過如下方式宣告臨時佇列

// 宣告一個臨時的佇列
String queue = channel.queueDeclare().getQueue();

宣告的臨時佇列如下,它是一個 AutoDelete、Exclusive 型別的佇列

2、工具類

public class RabbitmqUtils {
    private static final String HOST_ADDRESS = "192.168.59.130";
    private static final String USER_NAME = "admin";
    private static final String PASSWORD = "admin123";

    public static Channel getChannel() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST_ADDRESS);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

3、Consumer01

public class Consumer01 {
    private static final String EXCHANGE_NAME = "exchange_xiaomaomao";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取通道
        Channel channel = RabbitmqUtils.getChannel();

        // 宣告交換機
        // 消費者端需要將交換機和佇列進行繫結,這樣交換機便能將訊息推送到指定的佇列
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        // 宣告一個臨時的佇列
        String queue = channel.queueDeclare().getQueue();

        // 將交換機和佇列進行繫結
        // routingKey 使用空字串("")
        channel.queueBind(queue, EXCHANGE_NAME, "", null);

        // 訊息成功之後的回撥
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };

        // 取消消費者的回撥
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消費者時的回撥介面");
        };
        // 消費者消費訊息
        channel.basicConsume(queue, deliverCallback, cancelCallback);

        System.out.println("Consumer01 開始消費");
    }
}

4、Consumer02

public class Consumer02 {
    private static final String EXCHANGE_NAME = "exchange_xiaomaomao";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取通道
        Channel channel = RabbitmqUtils.getChannel();

        // 宣告交換機
        // 消費者端需要將交換機和佇列進行繫結,這樣交換機便能將訊息推送到指定的佇列
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        // 宣告一個臨時的佇列
        String queue = channel.queueDeclare().getQueue();

        // 將交換機和佇列進行繫結
        // routingKey 使用空字串("")
        channel.queueBind(queue, EXCHANGE_NAME, "", null);

        // 訊息成功之後的回撥
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };

        // 取消消費者的回撥
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消費者時的回撥介面");
        };
        // 消費者消費訊息
        channel.basicConsume(queue, deliverCallback, cancelCallback);

        System.out.println("Consumer02 開始消費");
    }
}

5、Producer

public class Producer {
    private static final String EXCHANGE_NAME = "exchange_xiaomaomao";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取通道
        Channel channel = RabbitmqUtils.getChannel();

        // 宣告一個 fanout 型別的交換機
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        // 要傳送的訊息
        String message = "xiaomaomao";
        for (int i = 1; i < 6; i++) {
            // 生產者推送訊息到佇列
            // MessageProperties.PERSISTENT_TEXT_PLAIN : 持久化訊息
            channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes(StandardCharsets.UTF_8));
        }

        System.out.println("Producer send message successfully...");
    }
}

6、測試及結果

首先啟動 Consumer01、Consumer02、然後再啟動 Producer 傳送訊息

RabbitMQ 控制檯 Exchanges

選擇該交換機點進去檢視詳情

Consumer01、Consumer02 的消費情況如下

三、direct

通過上面的案例,我們知道 fanout 型別的交換機是通過廣播的方式將它接收到的訊息傳送給它知道的所有佇列,也就是說所有的佇列都能收到相同的訊息,但是在某些場景下是不適用的,例如我想把重要的訊息和普通的訊息分離開來,分別傳送給不同的佇列,使用 fanout 是做不到的,這個時候我們就可以通過 direct 型別的交換機來實現

1、原理圖

2、Consumer01

public class Consumer01 {
    private static final String EXCHANGE_NAME = "exchange_xiaomaomao";
    private static final String IMPORTANT_QUEUE_NAME = "Queue01";
    private static final String IMPORTANT_ROUTING_KEY = "important";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取通道
        Channel channel = RabbitmqUtils.getChannel();
        // 宣告交換機
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 宣告一個臨時的佇列
        channel.queueDeclare(IMPORTANT_QUEUE_NAME, true, false, false, null);
        // 將交換機和佇列進行繫結
        channel.queueBind(IMPORTANT_QUEUE_NAME, EXCHANGE_NAME, IMPORTANT_ROUTING_KEY, null);

        // 訊息成功之後的回撥
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };
        // 取消消費者的回撥
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消費者時的回撥介面");
        };
        // 消費者消費訊息
        channel.basicConsume(IMPORTANT_QUEUE_NAME, deliverCallback, cancelCallback);

        System.out.println("Consumer01 開始消費");
    }
}

3、Consumer02

public class Consumer02 {
    private static final String EXCHANGE_NAME = "exchange_xiaomaomao";
    private static final String NORMAL_QUEUE_NAME = "Queue02";
    private static final String NORMAL_ROUTING_KEY = "normal";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取通道
        Channel channel = RabbitmqUtils.getChannel();
        // 宣告交換機
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 宣告一個臨時的佇列
        channel.queueDeclare(NORMAL_QUEUE_NAME, true, false, false, null);
        // 將交換機和佇列進行繫結
        channel.queueBind(NORMAL_QUEUE_NAME, EXCHANGE_NAME, NORMAL_ROUTING_KEY, null);

        // 訊息成功之後的回撥
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            System.out.println(msg);
        };
        // 取消消費者的回撥
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消費者時的回撥介面");
        };
        // 消費者消費訊息
        channel.basicConsume(NORMAL_QUEUE_NAME, deliverCallback, cancelCallback);

        System.out.println("Consumer02 開始消費");
    }
}

4、Producer

public class Producer {
    private static final String EXCHANGE_NAME = "exchange_xiaomaomao";
    private static final String IMPORTANT_QUEUE_NAME = "Queue01";
    private static final String IMPORTANT_ROUTING_KEY = "important";
    private static final String NORMAL_QUEUE_NAME = "Queue02";
    private static final String NORMAL_ROUTING_KEY = "normal";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取通道
        Channel channel = RabbitmqUtils.getChannel();

        // 宣告一個 fanout 型別的交換機
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 要傳送的訊息
        List<String> strList = new ArrayList<>();
        strList.add("important1");
        strList.add("important2");
        strList.add("important3");
        strList.add("normal1");
        strList.add("normal2");
        strList.add("normal3");

        if (!CollectionUtils.isEmpty(strList)) {
            strList.forEach((item) -> {
                if (item.contains("important")) {
                    try {
                        channel.basicPublish(EXCHANGE_NAME, IMPORTANT_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, item.getBytes(StandardCharsets.UTF_8));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                } else {
                    try {
                        channel.basicPublish(EXCHANGE_NAME, NORMAL_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, item.getBytes(StandardCharsets.UTF_8));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        System.out.println("Producer send message successfully...");
    }
}

5、測試及結果

檢視交換機詳情

檢視佇列消費情況

Consumer01、Consumer02 消費情況

四、topic

從上面的例子可以看出,direct 型別的交換機已經可以比較靈活的處理訊息了,但是 direct 模式的 routing key 一旦給定就無法再發生變化了,為了使交換機適用於更加靈活的場景,我們引入了 topic 模式

topic 型別交換機的 routing key 是不能隨意編寫的,它需要滿足一定的規範,首先它必須是一個單詞列表,以點號(.)分隔開,這些單詞可以是任意的單詞,例如 ncu.tech.north、stock.query.show、make.use.toy 等等,當然這些單詞的列表總長度不能超過 255 個位元組,在規則列表中可以使用萬用字元來代替

* : 代替一個單詞

# : 代替 0 個或者多個單詞

1、原理圖

2、Consumer01

public class Consumer01 {
    private static final String EXCHANGE_NAME = "exchange_topic_demo";
    private static final String FIRST_QUEUE_NAME = "Queue01";
    private static final String FIRST_ROUTING_KEY = "*.rabbit.*";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取通道
        Channel channel = RabbitmqUtils.getChannel();
        // 宣告交換機
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 宣告一個臨時的佇列
        channel.queueDeclare(FIRST_QUEUE_NAME, true, false, false, null);
        // 將交換機和佇列進行繫結
        channel.queueBind(FIRST_QUEUE_NAME, EXCHANGE_NAME, FIRST_ROUTING_KEY, null);

        // 訊息成功之後的回撥
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            System.out.println(message.getEnvelope().getRoutingKey() + "   " + msg);
        };
        // 取消消費者的回撥
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消費者時的回撥介面");
        };
        // 消費者消費訊息
        channel.basicConsume(FIRST_QUEUE_NAME, deliverCallback, cancelCallback);

        System.out.println("Consumer01 開始消費,Consumer01 的 routingKey 為" + FIRST_ROUTING_KEY);
    }
}

3、Consumer02

public class Consumer02 {
    private static final String EXCHANGE_NAME = "exchange_topic_demo";
    private static final String SECOND_QUEUE_NAME = "Queue02";
    private static final String SECOND_ROUTING_KEY_1 = "#.clever";
    private static final String SECOND_ROUTING_KEY_2 = "java.*.*";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取通道
        Channel channel = RabbitmqUtils.getChannel();
        // 宣告交換機
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 宣告一個臨時的佇列
        channel.queueDeclare(SECOND_QUEUE_NAME, true, false, false, null);
        // 將交換機和佇列進行繫結(多重繫結)
        channel.queueBind(SECOND_QUEUE_NAME, EXCHANGE_NAME, SECOND_ROUTING_KEY_1, null);
        channel.queueBind(SECOND_QUEUE_NAME, EXCHANGE_NAME, SECOND_ROUTING_KEY_2, null);

        // 訊息成功之後的回撥
        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody());
            System.out.println(message.getEnvelope().getRoutingKey() + "   " + msg);
        };
        // 取消消費者的回撥
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("取消消費者時的回撥介面");
        };
        // 消費者消費訊息
        channel.basicConsume(SECOND_QUEUE_NAME, deliverCallback, cancelCallback);

        System.out.println("Consumer02 開始消費,Consumer02 的 routingKey 為" + SECOND_ROUTING_KEY_1 + "  " + SECOND_ROUTING_KEY_2);
    }
}

4、Producer

public class Producer {
    private static final String EXCHANGE_NAME = "exchange_topic_demo";

    public static void main(String[] args) throws Exception {
        // 自定義工具類獲取通道
        Channel channel = RabbitmqUtils.getChannel();

        // 宣告一個 fanout 型別的交換機
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            // "*.rabbit.*"             "#.clever"  "java.*.*"
        Map<String, String> map = new HashMap<>();
        map.put("python.rabbit.nothing", "被佇列 Q1 接收到");
        map.put("java.python.hello", "被佇列 Q2 接收到");
        map.put("quick.rabbit.clever", "被佇列 Q1 Q2 接收到");
        map.put("java.rabbit.fox", "被佇列 Q1 Q2 接收到");

        map.entrySet().forEach((item) -> {
            try {
                // 傳送訊息
                channel.basicPublish(EXCHANGE_NAME, item.getKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, item.getValue().getBytes(StandardCharsets.UTF_8));
                System.out.println("傳送的訊息為; " + item.getKey()+"----" + item.getValue());
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        System.out.println("Producer send message successfully...");
    }
}

5、測試及結果

檢視交換機詳情

檢視佇列情況

Consumer01、Consumer02 消費情況