1. 程式人生 > 其它 >RabbitMQ系列--主題(Topic)

RabbitMQ系列--主題(Topic)

技術標籤:中介軟體# RabbitMQ

上一篇部落格中,我們使用了direct型別的交換機,使得消費者有能力進行選擇性的訊息。但是仍然存在一些侷限性:它不能夠基於多重條件進行路由選擇。 我們可以使用Topic型別的交換機解決這個問題。

Topic型別的Exchange與Direct相比,都是可以根據RoutingKey把訊息路由到不同的佇列。只不過Topic型別Exchange可以讓佇列在繫結Routing key的時候可以使用萬用字元!這種模型的Routingkey一般都是由一共或多個單片語成,多個單詞之間以"."分割,例如:item.insert;
在這裡插入圖片描述
萬用字元

  • #匹配0個或多個詞
  • *匹配不多不少恰好1個詞

比如上圖中的 * .orange. * 與Q1佇列進行了繫結,* .* . rabbit和lazy.#與Q2佇列進行了繫結, 那麼生產者傳送路由key為user.orange.key會被消費者C1接收到。 傳送路由key為user.orange.rabbit的訊息,會被消費者C1和消費者C2都接收到。 如果傳送lazy或lazy.aa或lazy.aa.aa 都會被消費者C2接收到。

接下來讓我們用程式碼來驗證下;

1.定義生產者

public class Provider {
    public static void main(String[] args) throws
IOException { //獲取連線物件 Connection connection = RabbitMQUtils.getConnection(); //獲取通道 Channel channel = connection.createChannel(); //將通道宣告指定交換機 //引數1:交換機名稱 引數2:交換機的型別 direct 路由型別 //沒有交換機會建立一共名為logs的交換機 channel.exchangeDeclare("topics"
,"topic"); //路由Key String routerKey = "user.orange.key"; //傳送訊息 channel.basicPublish("topics",routerKey,null,("這是基於topics的 [ "+ routerKey+"]的訊息").getBytes()); //關閉連線和通道 RabbitMQUtils.closeChannelAndConnection(channel,connection); } }

2.定義消費者1

public class Customer1 {
    public static void main(String[] args) throws IOException {
        //獲取連線物件
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道繫結交換機
        channel.exchangeDeclare("topics","topic");

        //建立一個臨時的、唯一的佇列
        //返回的是 臨時佇列名
        String queueName = channel.queueDeclare().getQueue();

        //繫結交換機和佇列
        //引數1: 佇列名稱  引數2:交換機名稱  引數3:路由名稱
        channel.queueBind(queueName,"topics","*.orange.*");
        //消費訊息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1:"+new String(body));
            }
        });
    }
}

3.定義消費者2

public class Customer2 {
    public static void main(String[] args) throws IOException {
        //獲取連線物件
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();

        //通道繫結交換機
        channel.exchangeDeclare("topics","topic");

        //建立一個臨時的、唯一的佇列
        //返回的是 臨時佇列名
        String queueName = channel.queueDeclare().getQueue();

        //繫結交換機和佇列
        //引數1: 佇列名稱  引數2:交換機名稱  引數3:路由名稱
        channel.queueBind(queueName,"topics","*.*.rabbit");
        channel.queueBind(queueName,"topics","lazy.#");

        //消費訊息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2:"+new String(body));
            }
        });
    }
}

先執行消費者1和消費者2,在執行生產者。 可以發現只有消費者1接收到了訊息
在這裡插入圖片描述
在這裡插入圖片描述

修改生產者的路由Key

 String routerKey = "user.orange.rabbit";

再次執行生產者,可以發現消費者1和消費者2都收到了訊息
在這裡插入圖片描述
在這裡插入圖片描述

修改生產者路由key

 String routerKey = "lazy";

在這裡插入圖片描述
只有消費者2收到了訊息。

可以看到,我們通過使用topic型別的交換機,成功實現了多重條件進行路由選擇。