RabbitMQ (五) 訂閱者模式之分發模式 ( fanout )
阿新 • • 發佈:2020-12-15
前面講到了簡單佇列和工作佇列.
這兩種佇列有個非常明顯的缺點 : 生產者傳送的訊息,只能進入到一個佇列.
訊息只能進入到一個佇列就意味著訊息只能被一個消費者消費.
儘管工作佇列模式中,一個佇列中的訊息可以被多個消費者消費,但是,具體到每一條訊息,卻只能被一個消費者消費.
如果想要一個訊息被多個消費者消費,那麼生產者就必須把這條訊息傳送到多個佇列中去.
RabbitMQ 在這個點的設計是 :
在生產者和佇列兩者之間加入了一個叫做"交換機"的東西.
生產者傳送訊息時,不直接傳送到佇列,而是傳送到"交換機"(其實簡單佇列和工作佇列也是這樣的...前面的文章有提到,它們用的是預設的交換機).
"交換機"再根據宣告的型別(fanout,direct,topic,headers),轉發給符合要求的佇列.
這裡有個非常重要的知識點:
交換機只是一個"中轉的機器",它不是一個訊息佇列,它沒有儲存訊息的能力.這點很重要!
這意味著,當生產者把訊息傳送給某個交換機時,如果這時候,這個交換機沒有被任何佇列繫結,那麼這些訊息將會丟失!
這種利用交換機,將訊息"傳送"到多個佇列的模式叫做 : 訂閱者模式.
這篇文章主要介紹訂閱者模式中的分發模式,
這種模式下,訊息會被所有消費者消費.也就是說,只要是"繫結"到某個交換機的佇列,都會收到生產者傳送到該交換機的訊息.
生產者
public class Producer { /// <summary> /// 交換機名稱 /// </summary> private const string ExchangeName = "test_exchange_fanout"; public static void Send() { IConnection connection = ConnectionHelper.GetConnection(); IModel channel = connection.CreateModel(); //宣告交換機,第2個引數為交換機型別 channel.ExchangeDeclare(ExchangeName, "fanout", false, false, null); for (int i = 0; i < 50; i++) { string msg = "hello world " + i; //第2個引數為路由鍵,這種模式顯然不需要路由鍵了,因為我們是把訊息傳送到所有繫結到該交換機的佇列. channel.BasicPublish(ExchangeName, "", null, Encoding.Default.GetBytes(msg)); Console.WriteLine($"send {msg}"); } channel.Close(); connection.Close(); } }
消費者1
public class Consumer1 { private const string QueueName = "test_exchange1_queue"; private const string ExchangeName = "test_exchange_fanout"; public static void Receive() { IConnection connection = ConnectionHelper.GetConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare(QueueName, false, false, false, null); //將佇列繫結到交換機上 channel.QueueBind(QueueName, ExchangeName, "", null); //新增消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //註冊事件 consumer.Received += (s, e) => { byte[] bytes = e.Body; string str = Encoding.Default.GetString(bytes); Console.WriteLine("consumer1 : " + str); }; channel.BasicConsume(QueueName, true, "", false, false, null, consumer); } }
消費者2
只有這兩句不一樣
private const string QueueName = "test_exchange2_queue"; Console.WriteLine("consumer2 : " + str);
執行結果就不上圖.