RabbitMQ (五) : 訂閱者模式之分發模式 ( fanout )
阿新 • • 發佈:2019-02-05
class 是把 end con als connect 註冊事件 () inf
前面講到了簡單隊列和工作隊列.
這兩種隊列有個非常明顯的缺點 : 生產者發送的消息,只能進入到一個隊列.
消息只能進入到一個隊列就意味著消息只能被一個消費者消費.
盡管工作隊列模式中,一個隊列中的消息可以被多個消費者消費,但是,具體到每一條消息,卻只能被一個消費者消費.
如果想要一個消息被多個消費者消費,那麽生產者就必須把這條消息發送到多個隊列中去.
RabbitMQ 在這個點的設計是 :
在生產者和隊列兩者之間加入了一個叫做"交換機"的東西.
生產者發送消息時,不直接發送到隊列,而是發送到"交換機"(其實簡單隊列和工作隊列也是這樣的...前面的文章有提到,它們用的是默認的交換機).
"交換機"再根據聲明的類型(fanout,direct,topic,headers),轉發給符合要求的隊列.
這裏有個非常重要的知識點:
交換機只是一個"中轉的機器",它不是一個消息隊列,它沒有存儲消息的能力.這點很重要!
這意味著,當生產者把消息發送給某個交換機時,如果這時候,這個交換機沒有被任何隊列綁定,那麽這些消息將會丟失!
這種利用交換機,將消息"發送"到多個隊列的模式叫做 : 訂閱者模式.
這篇文章主要介紹訂閱者模式中的分發模式,
這種模式下,消息會被所有消費者消費.也就是說,只要是"綁定"到某個交換機的隊列,都會收到生產者發送到該交換機的消息.
生產者
public class Producer { /// <summary> ///交換機名稱 /// </summary> private const string ExchangeName = "test_exchange_fanout"; public static void Send() { IConnection connection = ConnectionHelper.GetConnection(); IModel channel = connection.CreateModel(); //聲明交換機,第2個參數為交換機類型channel.ExchangeDeclare(ExchangeName, "fanout", false, false, null); for (int i = 0; i < 50; i++) { string msg = "hello world " + i; //第2個參數為路由鍵,這種模式顯然不需要路由鍵了,因為我們是把消息發送到所有綁定到該交換機的隊列. channel.BasicPublish(ExchangeName, "", null, Encoding.Default.GetBytes(msg)); Console.WriteLine($"send {msg}"); } channel.Close(); connection.Close(); } }
消費者1
public class Consumer1 { private const string QueueName = "test_exchange1_queue"; private const string ExchangeName = "test_exchange_fanout"; public static void Receive() { IConnection connection = ConnectionHelper.GetConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare(QueueName, false, false, false, null); //將隊列綁定到交換機上 channel.QueueBind(QueueName, ExchangeName, "", null); //添加消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //註冊事件 consumer.Received += (s, e) => { byte[] bytes = e.Body; string str = Encoding.Default.GetString(bytes); Console.WriteLine("consumer1 : " + str); }; channel.BasicConsume(QueueName, true, "", false, false, null, consumer); } }
消費者2
只有這兩句不一樣
private const string QueueName = "test_exchange2_queue"; Console.WriteLine("consumer2 : " + str);
運行結果就不上圖.
RabbitMQ (五) : 訂閱者模式之分發模式 ( fanout )