1. 程式人生 > 其它 >RabbitMQ(十)——Routing 之訂閱模型-Topic

RabbitMQ(十)——Routing 之訂閱模型-Topic

Routing 之訂閱模型-Topic

Topic簡介

Topic型別的ExchangeDirect相比,都是可以根據RoutingKey把訊息路由到不同的佇列。只不過Topic型別Exchange可以讓佇列在繫結Routing key 的時候使用萬用字元!這種模型Routingkey 一般都是由一個或多個單片語成,多個單詞之間以”.”分割,例如: item.insert

Topic 的要求

傳送到型別是 topic 交換機的訊息的 routing_key 不能隨意寫,必須滿足一定的要求,它必須是一個單詞列表,以點號分隔開。這些單詞可以是任意單詞,比如說:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".這種型別的。當然這個單詞列表最多不能超過 255 個位元組。

在這個規則列表中,其中有兩個替換符需要注意:

*(星號)可以代替一個單詞
#(井號)可以替代零個或多個單詞

Topic 匹配案例

下圖繫結關係如下:
Q1-->繫結的是

  • 中間帶 orange 帶 3 個單詞的字串(.orange.)

Q2-->繫結的是

  • 最後一個單詞是 rabbit 的 3 個單詞(..rabbit)
  • 第一個單詞是 lazy 的多個單詞(lazy.#)

上圖是一個佇列繫結關係圖,我們來看看他們之間資料接收情況是怎麼樣的
quick.orange.rabbit------->被佇列 Q1Q2 接收到
lazy.orange.elephant ------->被佇列 Q1Q2 接收到
quick.orange.fox ------->被佇列 Q1 接收到
lazy.brown.fox ------->被佇列 Q2 接收到
lazy.pink.rabbit-------> 雖然滿足兩個繫結但只被佇列 Q2 接收一次
quick.brown.fox ------->不匹配任何繫結不會被任何佇列接收到會被丟棄
quick.orange.male.rabbit-------> 是四個單詞不匹配任何繫結會被丟棄
lazy.orange.male.rabbit-------> 是四個單詞但匹配 Q2

實戰

消費者一

package com.study.rabbitmq.seven;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.study.rabbitmq.utils.RabbitMQUtils;

//消費者
public class ReceiveLogsTopic01 {
    //交換機名稱
    public static final String EXCHANGE_NAME = "topic_logs";
    //接收訊息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        //宣告交換機
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //宣告佇列
        String queueName = "Q1";
        channel.queueDeclare(queueName,false,false,false,null);
        //繫結通道
        channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
        System.out.println("等待接收訊息.....");

        DeliverCallback deliverCallback = (consumerTag, message) ->{
            System.out.println(new String(message.getBody(),"UTF-8"));
            System.out.println("接收佇列:"+queueName + "繫結key:" + message.getEnvelope().getRoutingKey());
        };

        channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});
    }
}

消費者二

package com.study.rabbitmq.seven;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.study.rabbitmq.utils.RabbitMQUtils;

//消費者
public class ReceiveLogsTopic02 {
    //交換機名稱
    public static final String EXCHANGE_NAME = "topic_logs";
    //接收訊息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        //宣告交換機
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //宣告佇列
        String queueName = "Q2";

        channel.queueDeclare(queueName,false,false,false,null);
        //繫結通道
        channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
        System.out.println("等待接收訊息.....");

        DeliverCallback deliverCallback = (consumerTag, message) ->{
            System.out.println(new String(message.getBody(),"UTF-8"));
            System.out.println("接收佇列:"+queueName + "繫結key:" + message.getEnvelope().getRoutingKey());
        };

        channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});
    }
}

生產者

package com.study.rabbitmq.seven;

import com.rabbitmq.client.Channel;
import com.study.rabbitmq.utils.RabbitMQUtils;

import java.util.HashMap;
import java.util.Map;

//生產者
public class EmitLogTopic {
    //交換機名稱
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        /*
        * Q1-->繫結的是
        *           中間帶orange的3個單詞的通道(*.orange.*)
        * Q2-->繫結的是
        *           最後一個單詞是rabbit的3個單詞的通道(*.*.rabbit)
        *           第一個單詞是lazy的多個單詞的通道(lazy.#)
        * */
        HashMap<String, String> bindingKeymap = new HashMap<>();
        bindingKeymap.put("quick.orange.rabbit","被佇列Q1Q2接收到");
        bindingKeymap.put("lazy.orange.elephant","被佇列Q1Q2接收到");
        bindingKeymap.put("quick.orange.fox","被佇列Q1接收到");
        bindingKeymap.put("lazy.brown.fox","被佇列Q2接收到");
        bindingKeymap.put("lazy.pink.rabbit","雖然滿足兩個繫結,但只被佇列Q2接收一次");
        bindingKeymap.put("quick.brown.fox","不匹配任何繫結不會被任何佇列接收到,會被丟棄");
        bindingKeymap.put("quick.orange.male.rabbit","是四個單詞不匹配任何繫結,會被丟棄");
        bindingKeymap.put("lazy.orange.male.rabbit","被Q2接收到");

        for (Map.Entry<String, String> bindingKeyEntry : bindingKeymap.entrySet()) {
            String routingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
            System.out.println("生產者發出訊息:" + message);
        }
    }
}

執行測試: