1. 程式人生 > 其它 >RabbitMq-topic萬用字元模式(五)

RabbitMq-topic萬用字元模式(五)

一、topics萬用字元模式:

Topic 萬用字元型別與 Direct路由 相比,都是可以根據 RoutingKey 把訊息路由到不同的佇列。只不過 Topic 型別 Exchange 可以讓佇列在繫結 Routing key 的時候使用萬用字元

1.1 topic模式的Routingkey

Routingkey 一般都是有一個或多個單片語成,多個單詞之間以”.”分割,例如: item.insert

萬用字元規則:

# : 匹配一個或多個詞
* : 匹配不多不少恰好1個詞

舉例:

item.# :能夠匹配 item.insert.abc 或者 item.insert

item.* :只能匹配 item.insert

1.2 topic模式示例圖

圖解:

紅色Queue:繫結的是 usa.# ,因此凡是以 usa. 開頭的 routing key 都會被匹配到

黃色Queue:繫結的是 #.news ,因此凡是以 .news 結尾的 routing key 都會被匹配

二、topic萬用字元模式需求

編寫生產者、消費者程式碼並測試瞭解Topics萬用字元模式的特點

2.1 步驟:

  • 生產者:傳送包含有item.insert、item.update,item.delete的3中路由key訊息
  • 消費者1:監聽的佇列繫結到交換機的路由key為:item.update,item.delete
  • 消費者2:監聽的佇列繫結到交換機的路由key為:item.*

2.2 程式碼示例

生產者程式碼Producer:

package com.study.rabbitmq.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.study.rabbitmq.util.ConnectionUtil;

//topic路由模式 傳送訊息

public class Producer {
    //交換機名稱
    static final String TOPIC_EXCHANGE = "topic_exchange";
    //佇列名稱
    static final String TOPIC_QUEUE_1 = "topic_queue_1";
    static final String TOPIC_QUEUE_2 = "topic_queue_2";

    public static void main(String[] args) throws Exception  {
        // 1. 建立連線
        Connection connection = ConnectionUtil.getConnection();
        // 2. 建立頻道;
        Channel channel = connection.createChannel();
        // 3.宣告交換機,引數1:交換機名稱,引數2:交換機型別(FANOUT廣播型別、direct 定向路由,topic 萬用字元)
        channel.exchangeDeclare(TOPIC_EXCHANGE , BuiltinExchangeType.TOPIC);

            //4. 傳送訊息;
                String message = "新增商品(萬用字元模式)---routingkey 為 insert";
                /**
                 * 引數1:交換機名稱;如果沒有則指定空字串(表示使用預設的交換機)
                 * 引數2:路由key,簡單模式中可以使用佇列名稱
                 * 引數3:訊息其它屬性
                 * 引數4:訊息內容
                 */
                channel.basicPublish(TOPIC_EXCHANGE,"item.insert",null,message.getBytes());
                System.out.println("已傳送訊息:" + message);

                message = "更新商品(萬用字元模式)---routingkey 為 update";
                channel.basicPublish(TOPIC_EXCHANGE,"item.update",null,message.getBytes());
                System.out.println("已傳送訊息:" + message);

                message = "刪除商品(萬用字元模式)---routingkey 為 delete";
                channel.basicPublish(TOPIC_EXCHANGE,"item.delete",null,message.getBytes());
                System.out.println("已傳送訊息:" + message);

                //5. 關閉資源
                channel.close();
                connection.close();
            }
}

消費者1Consumer1-程式碼:

package com.study.rabbitmq.topic;
import com.rabbitmq.client.*;
import com.study.rabbitmq.util.ConnectionUtil;

import java.io.IOException;


/**
 * topic路由模式;消費者接收訊息
 */
public class Consumer1 {
    public static void main(String[] args) throws Exception {
//        1. 建立連線;(抽取一個獲取連線的工具類)
        Connection connection = ConnectionUtil.getConnection();
//        2. 建立頻道;
        Channel channel = connection.createChannel();
        //3 宣告交換機
        channel.exchangeDeclare(com.study.rabbitmq.topic.Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);
//        4. 宣告佇列;
        /**
         * 引數1:佇列名稱
         * 引數2:是否定義持久化佇列(訊息會持久化儲存在伺服器上)
         * 引數3:是否獨佔本連線
         * 引數4:是否在不使用的時候佇列自動刪除
         * 引數5:其它引數
         */
        channel.queueDeclare(com.study.rabbitmq.topic.Producer.TOPIC_QUEUE_1, true, false, false, null);
        //5 佇列繫結到交換機上
        channel.queueBind(com.study.rabbitmq.topic.Producer.TOPIC_QUEUE_1, com.study.rabbitmq.topic.Producer.TOPIC_EXCHANGE,"item.update");
        channel.queueBind(com.study.rabbitmq.topic.Producer.TOPIC_QUEUE_1, com.study.rabbitmq.topic.Producer.TOPIC_EXCHANGE,"item.delete");

//        6. 建立消費者(接收訊息並處理訊息);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key為:" + envelope.getRoutingKey());
                //交換機
                System.out.println("交換機為:" + envelope.getExchange());
                //訊息id
                System.out.println("訊息id為:" + envelope.getDeliveryTag());
                //接收到的訊息
                System.out.println("消費者1---接收到的訊息為:" + new String(body, "utf-8"));
            }
        };
//        6. 監聽佇列   (需要持續監聽佇列訊息,所以不要關閉資源)
        /**
         * 引數1:佇列名
         * 引數2:是否要自動確認;設定為true表示訊息接收到自動向MQ回覆接收到了,MQ則會將訊息從佇列中刪除;
         * 如果設定為false則需要手動確認
         * 引數3:消費者
         */
        channel.basicConsume(Producer.TOPIC_QUEUE_1,true,defaultConsumer);
        //不關閉資源,應該一直監聽訊息
        // channel.close();
        // connection.close();


    }

}

消費者2-Consumer2 程式碼:

package com.study.rabbitmq.topic;
import com.rabbitmq.client.*;
import com.study.rabbitmq.util.ConnectionUtil;

import java.io.IOException;


/**
 * topic路由模式;消費者接收訊息
 */
public class Consumer2 {
    public static void main(String[] args) throws Exception {
//        1. 建立連線;(抽取一個獲取連線的工具類)
        Connection connection = ConnectionUtil.getConnection();
//        2. 建立頻道;
        Channel channel = connection.createChannel();
        //3 宣告交換機
        channel.exchangeDeclare(Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);
//        4. 宣告佇列;
        /**
         * 引數1:佇列名稱
         * 引數2:是否定義持久化佇列(訊息會持久化儲存在伺服器上)
         * 引數3:是否獨佔本連線
         * 引數4:是否在不使用的時候佇列自動刪除
         * 引數5:其它引數
         */
        channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);
        //5 佇列繫結到交換機上
        channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHANGE,"item.*");

//        6. 建立消費者(接收訊息並處理訊息);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key為:" + envelope.getRoutingKey());
                //交換機
                System.out.println("交換機為:" + envelope.getExchange());
                //訊息id
                System.out.println("訊息id為:" + envelope.getDeliveryTag());
                //接收到的訊息
                System.out.println("消費者2---接收到的訊息為:" + new String(body, "utf-8"));
            }
        };
//    6. 監聽佇列   (需要持續監聽佇列訊息,所以不要關閉資源)
        /**
         * 引數1:佇列名
         * 引數2:是否要自動確認;設定為true表示訊息接收到自動向MQ回覆接收到了,MQ則會將訊息從佇列中刪除;
         * 如果設定為false則需要手動確認
         * 引數3:消費者
         */
        channel.basicConsume(Producer.TOPIC_QUEUE_2,true,defaultConsumer);
        //不關閉資源,應該一直監聽訊息
        // channel.close();
        // connection.close();


    }

}

先執行消費者1 、2 再執行生產者,
idea結果: 消費者1佇列 接收:路由key為:item.update,item.delete;消費者2佇列 接收:路由key為:item.update,item.delete,item.insert

rabbitmq後臺管理介面結果

三、topic萬用字元總結:

Topics萬用字元模式:可以根據路由key將訊息傳遞到對應路由key的佇列;

佇列繫結到交換機的路由key可以有多個;

萬用字元模式中路由key可以使用 *# ;使用了萬用字元模式之後對於路由Key的配置更加靈活。