1. 程式人生 > 實用技巧 >RabbitMQ (六) 訂閱者模式之路由模式 ( direct )

RabbitMQ (六) 訂閱者模式之路由模式 ( direct )

路由模式下,生產者傳送訊息時需要指定一個路由鍵(routingKey),交換機只會把訊息轉發給包含該路由鍵的佇列

這裡,我們改變一下宣告交換機的方式.

我們通過管理後臺新增一個交換機.

新增後,生產者和消費者的程式碼中就不需要再宣告交換機了.同樣,也可以通過管理後臺新增佇列,那麼程式碼中也不需要宣告隊列了.

生產者

    public class Producer
    {

        private const string ExchangeName = "test_exchange_direct";

        public static void Send()
        {
            IConnection connection = ConnectionHelper.GetConnection();
            IModel channel = connection.CreateModel();  
            string msg = "hello world ";

            //把訊息傳送到交換機,交換機再轉發到包含路由鍵"refuge"的佇列.
            channel.BasicPublish(ExchangeName, "refuge", null, Encoding.Default.GetBytes(msg));
            Console.WriteLine($"send {msg}");
            
            channel.Close();
            connection.Close();
        }
    }

消費者1

    public class Consumer1
    {
        private const string QueueName = "test_exchange1_queue";
        private const string ExchangeName = "test_exchange_direct";

        public static void Receive()
        {
            //獲取連線
            RabbitMQ.Client.IConnection connection = ConnectionHelper.GetConnection();

            //建立通道
            RabbitMQ.Client.IModel channel = connection.CreateModel();

            //宣告佇列
            channel.QueueDeclare(QueueName, false, false, false, null);

            //宣告交換機
            //channel.ExchangeDeclare(ExchangeName, "direct", false, false, null);

            //將佇列繫結到交換機上,路由鍵為"wjire"
            channel.QueueBind(QueueName, ExchangeName, "wjire", null);

            //新增消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //註冊事件
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string str = Encoding.Default.GetString(bytes);
                Console.WriteLine("consumer1 : " + str);
            };

            //監聽佇列
            channel.BasicConsume(QueueName, true, "", false, false, null, consumer);
        }
    }

消費者2

    public class Consumer2
    {
        private const string QueueName = "test_exchange2_queue";
        private const string ExchangeName = "test_exchange_direct";
        public static void Receive()
        {
            //獲取連線
            RabbitMQ.Client.IConnection connection = ConnectionHelper.GetConnection();

            //建立通道
            RabbitMQ.Client.IModel channel = connection.CreateModel();

            //宣告佇列
            channel.QueueDeclare(QueueName, false, false, false, null);

            //宣告交換機
            //channel.ExchangeDeclare(ExchangeName, "direct", false, false, null);

            //將佇列繫結到交換機上,該佇列匹配兩個路由鍵,"refuge"和"wjire"
            channel.QueueBind(QueueName, ExchangeName, "refuge", null);
            channel.QueueBind(QueueName, ExchangeName, "wjire", null);

            //新增消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //註冊事件
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string str = Encoding.Default.GetString(bytes);
                Console.WriteLine("         consumer2 : " + str);
            };

            //監聽佇列
            channel.BasicConsume(QueueName, true, "", false, false, null, consumer);
        }
    }

執行結果:

可以看到,只有消費者2消費了訊息.