1. 程式人生 > 實用技巧 >RabbitMQ (五) 訂閱者模式之分發模式 ( fanout )

RabbitMQ (五) 訂閱者模式之分發模式 ( fanout )

前面講到了簡單佇列和工作佇列.

這兩種佇列有個非常明顯的缺點 : 生產者傳送的訊息,只能進入到一個佇列.

訊息只能進入到一個佇列就意味著訊息只能被一個消費者消費.

儘管工作佇列模式中,一個佇列中的訊息可以被多個消費者消費,但是,具體到每一條訊息,卻只能被一個消費者消費.

如果想要一個訊息被多個消費者消費,那麼生產者就必須把這條訊息傳送到多個佇列中去.

RabbitMQ 在這個點的設計是 :

在生產者和佇列兩者之間加入了一個叫做"交換機"的東西.

生產者傳送訊息時,不直接傳送到佇列,而是傳送到"交換機"(其實簡單佇列和工作佇列也是這樣的...前面的文章有提到,它們用的是預設的交換機).

"交換機"再根據宣告的型別(fanout,direct,topic,headers),轉發給符合要求的佇列.

這裡有個非常重要的知識點:

交換機只是一個"中轉的機器",它不是一個訊息佇列,它沒有儲存訊息的能力.這點很重要!

這意味著,當生產者把訊息傳送給某個交換機時,如果這時候,這個交換機沒有被任何佇列繫結,那麼這些訊息將會丟失!

這種利用交換機,將訊息"傳送"到多個佇列的模式叫做 : 訂閱者模式.

這篇文章主要介紹訂閱者模式中的分發模式,

這種模式下,訊息會被所有消費者消費.也就是說,只要是"繫結"到某個交換機的佇列,都會收到生產者傳送到該交換機的訊息.

生產者

    public class Producer
    {
        /// <summary>
        /// 交換機名稱
        /// </summary>
        private const string ExchangeName = "test_exchange_fanout";

        public static void Send()
        {
            IConnection connection = ConnectionHelper.GetConnection();
            IModel channel = connection.CreateModel();

            //宣告交換機,第2個引數為交換機型別
            channel.ExchangeDeclare(ExchangeName, "fanout", false, false, null);

            for (int i = 0; i < 50; i++)
            {
                string msg = "hello world " + i;
                //第2個引數為路由鍵,這種模式顯然不需要路由鍵了,因為我們是把訊息傳送到所有繫結到該交換機的佇列.
                channel.BasicPublish(ExchangeName, "", null, Encoding.Default.GetBytes(msg));
                Console.WriteLine($"send {msg}");
            }
            channel.Close();
            connection.Close();
        }
    }

消費者1

    public class Consumer1
    {
        private const string QueueName = "test_exchange1_queue";
        private const string ExchangeName = "test_exchange_fanout";

        public static void Receive()
        {
            IConnection connection = ConnectionHelper.GetConnection();
            IModel channel = connection.CreateModel();
            channel.QueueDeclare(QueueName, false, false, false, null);

            //將佇列繫結到交換機上
            channel.QueueBind(QueueName, ExchangeName, "", null);

            //新增消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //註冊事件
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string str = Encoding.Default.GetString(bytes);
                Console.WriteLine("consumer1 : " + str);
            };

            channel.BasicConsume(QueueName, true, "", false, false, null, consumer);
        }
    }

消費者2

只有這兩句不一樣

        private const string QueueName = "test_exchange2_queue";

        Console.WriteLine("consumer2 : " + str);

執行結果就不上圖.