1. 程式人生 > 實用技巧 >RabbitMQ(三):Hello RabbitMQ

RabbitMQ(三):Hello RabbitMQ

一、概念

RabbitMQ官網介紹:

RabbitMQ是訊息代理,用來接收並轉發訊息。可以把RabbitMQ看作是郵局。將要郵寄的郵件放在郵箱中,確保郵遞員會把郵件傳遞給接收者。

二、模型

"P"—producer:生產者。建立訊息後傳送到佇列(queue)中。

queue:佇列。訊息儲存在佇列中,佇列容量僅受主機的記憶體和磁碟約束,基本上是一個無限的緩衝區。多個生產者(producers)能夠把訊息傳送給同一個佇列,同樣,多個消費者(consumers)也能從同一個佇列(queue)中獲取資料。

“C”—consumers:消費者,一個等待獲取訊息的程式。

生產者、佇列和消費者不必位於同一臺機器上。一個應用程式可以是生產者,也可以是消費者。

三、RabbitMQ簡單使用

編寫兩個控制檯程式,傳送單個訊息的生產者和接收訊息的消費者。引用RabbitMQ.Client.dll

  • 生產者:
 1 //Send.cs
 2         static void Main(string[] args)
 3         {
 4             //例項化連線工廠
 5             var factory = new ConnectionFactory()
 6             {
 7                 HostName = "localhost", //RabbitMQ服務在本地執行
 8                 UserName = "
guest", //使用者名稱 9 Password = "guest" //密碼 10 }; 11 //建立連線 12 using (var connection = factory.CreateConnection()) 13 { 14 //建立通道 15 using (var channel = connection.CreateModel()) 16 { 17 //
申明一個名為hello的佇列 18 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); 19 //建立傳輸的訊息內容 20 string message = "hello rabbitmq"; 21 //構建byte訊息資料包 22 var body = Encoding.UTF8.GetBytes(message); 23 //傳送資料包 24 channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); 25 Console.WriteLine("Sent: {0}", message); 26 Console.ReadLine(); 27 } 28 } 29 }
  1. 建立一個連線工廠,設定目標(如果是在本機,所以設定為localhost;如果RabbitMQ不在本機,只需要設定目標機器的IP地址或者機器名稱即可),然後設定使用者名稱和密碼。
  2. 建立一個channel。
  3. 建立佇列,將訊息傳送到佇列中。訊息是以二進位制陣列形式傳輸的,所以如果訊息是實體物件,需要序列化然後轉化為二進位制陣列。
  4. 客戶端(消費者)傳送程式碼後,訊息會發送到訊息佇列中。

結果顯示:

可在控制檯使用rabbitmqctl list_queues來檢視所有的訊息佇列

或者通過web管理介面檢視

  • 消費者:
 1 //Receive.cs
 2         static void Main(string[] args)
 3         {
 4             //例項化連線工廠
 5             var factory = new ConnectionFactory()
 6             {
 7                 HostName = "localhost", //RabbitMQ服務在本地執行
 8                 UserName = "guest", //使用者名稱
 9                 Password = "guest"  //密碼
10             };
11 
12             //建立連線
13             using (var connection = factory.CreateConnection())
14             {
15                 //建立通道
16                 using (var channel = connection.CreateModel())
17                 {
18                     //申明一個名為hello的佇列
19                     channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
20                     //構造消費者例項
21                     var consumer = new EventingBasicConsumer(channel);
22                     //繫結訊息接收後的事件委託
23                     consumer.Received += (model, ea) =>
24                     {
25                         var body = ea.Body;
26                         var message = Encoding.UTF8.GetString(body.ToArray());
27                         Console.WriteLine("Received: {0}", message);
28                     };
29                     //啟動消費者
30                     channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
31                     Console.ReadLine();
32                 }
33             }
34         }

結果顯示:

再查queue中的訊息

發現訊息的內容再被接收後就被刪除了。

可以看出來,生產者和消費者的程式碼前部分是一樣的。主要區別在於生產者(傳送端)呼叫channel.BasicPublish()來發送訊息;而消費者(接收端)需要例項化一個EventBasicConsumer例項來進行訊息處理。需要注意的是,兩者的佇列名稱需保持一致

四、工作佇列

下面看一下RabbitMQ的工作佇列(Work Queues/Task Queues)。

工作佇列的好處就是能並行處理佇列,在多個工作人員之間分配任務。如果佇列中堆積了很多訊息/任務,只需要新增多個工作者就可以了。

例子:啟動兩個接收端來等待接收訊息,再開啟一個傳送端傳送訊息。

我們稍微修改上面例子中的Send程式,允許從命令列傳送任意命令:

 1  //Send.cs
 2         static void Main(string[] args)
 3         {
 4             var factory = new ConnectionFactory()
 5             {
 6                 HostName = "localhost", 
 7                 UserName = "guest", 
 8                 Password = "guest" 
 9             };
10             using (var connection = factory.CreateConnection())
11             {
12                 using (var channel = connection.CreateModel())
13                 {
14                     channel.QueueDeclare(queue: "task_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
15                     string message = GetMessage(args);
16                     var body = Encoding.UTF8.GetBytes(message);
17                     var properties = channel.CreateBasicProperties();
18                     properties.Persistent = true;
19 
20                     channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);
21                     Console.WriteLine("Send {0}", message);
22                 }
23             }
24         }
25 
26         private static string GetMessage(string[] args)
27         {
28             return ((args.Length > 0) ? string.Join(" ", args) : "hello rabbitmq");
29         }

修改接收端,偽造任務的複雜度,根據訊息中“.”的個數增加對應的Sleep時間:

 1  //Receive.cs
 2         static void Main(string[] args)
 3         {
 4             var factory = new ConnectionFactory()
 5             {
 6                 HostName = "localhost",
 7                 UserName = "guest",
 8                 Password = "guest"
 9             };
10             using (var connection = factory.CreateConnection())
11             {
12                 using (var channel = connection.CreateModel())
13                 {
14                     channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
15 
16                     var consumer = new EventingBasicConsumer(channel);
17                     consumer.Received += (model, ea) =>
18                     {
19                         var body = ea.Body.ToArray();
20                         var message = Encoding.UTF8.GetString(body);
21                         Console.WriteLine("Received {0}", message);
22 
23                         //模擬執行時間的假任務
24                         int dot = message.Split('.').Length - 1;
25                         Thread.Sleep(dot * 1000);
26 
27                         Console.WriteLine("Done");
28                     };
29                     channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);
30                     Console.ReadLine();
31                 }
32             }
33         }

先啟動兩個接收端等待接收訊息,然後再啟動一個傳送端開始傳送訊息。

通過cmd使用Send傳送端傳送5條訊息,每條訊息後“.”的數量代表消耗的時長:

通過兩個接收端檢視訊息的接收:

從圖中可以看出,兩個接收端按順序依次接收到訊息。第一條訊息給接收端A,第二條訊息給接收端B,第三條訊息給接收端A......以此類推。

所以在預設情況下,RabbitMQ是按順序依次將每條訊息傳送給消費者,平均每個消費者將會接收到相同數量的訊息。這種分發的方式稱為迴圈(round-robin)。

五、訊息確認

在上面的例子中,當RabbitMQ將訊息傳送給消費者,訊息就會從佇列中移除。如果這時消費者突然掛掉(比如通道關閉、連線關閉、TCP連線丟失等),就會造成訊息丟失。

為了防止訊息丟失,RabbitMQ提供了訊息確認(message acknowledgements)。當消費者成功接收訊息並處理完成,會發送一個ack(確認)到RabbitMQ,Rabbit接收到這個訊號後就會把這條已處理的訊息從佇列中刪除。如果消費者掛掉了,沒有傳送ack,RabbitMQ就會明白某個訊息沒有被正常處理,就會將該訊息重新入隊。如果同一時間還有其他消費者線上,RabbitMQ會將這條訊息重新發送給另一個消費者。

RabbitMQ中沒有訊息超時的概念,只有當消費者關閉或失去連線時,RabbitMQ才會重新分發訊息。

訊息確認是預設開啟的。

下面通過修改Receive.cs的程式碼來看訊息確認這一機制

 1     var consumer = new EventingBasicConsumer(channel);
 2 
 3     consumer.Received += (model, ea) =>
 4     {
 5         var body = ea.Body;
 6         var message = Encoding.UTF8.GetString(body.ToArray());
 7         Console.WriteLine("Received {0}", message);
 8 
 9         //模擬執行時間的假任務
10         int dot = message.Split('.').Length - 1;
11         Thread.Sleep(dot * 1000);
12 
13         Console.WriteLine("Done");
14         //傳送訊息確認訊號(手動確認)
15         channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
16     };
17     //啟動消費者
18     //autoAck: true  自動進行訊息確認,當消費者接收到訊息後自動傳送ack訊號,不管訊息是否處理完畢
19     //autoAck: false 關閉自動確認,通過呼叫BasicAck方法手動確認
20     channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);

主要改動在於將autoAck設定為false,並在訊息處理完後呼叫BasicAck進行手動確認。

由上圖可知,訊息傳送端連續傳送五條訊息,接收端A接收並處理了第一條訊息,接收端B被迴圈分配到第二條訊息,但在處理完成之前接收端B中斷了。於是RabbitMQ將被中斷的訊息2(Second Message)分發給接收端A。

、訊息持久化

訊息確認確保在消費端異常時,訊息不會丟失並能夠被重新分發。但如果是RabbitMQ服務端出現異常,訊息依然會丟失。

當RabbitMQ服務端奔潰或者關閉時,它會忘記佇列和訊息。我們可以通過指定 durable:true 和 Persisent=true,來將佇列和訊息標記為持久化。

 1     //Send.cs
 2     //申明佇列時,指定durable: true對訊息進行持久化
 3     channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
 4     string message = GetMessage(args);
 5     var body = Encoding.UTF8.GetBytes(message);
 6     //將訊息標誌為永續性:將IBasicProperties.Persistent設定為true
 7     var properties = channel.CreateBasicProperties();
 8     properties.Persistent = true;
 9 
10     channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);

註釋:將訊息標記為永續性並不能完全保證訊息不會丟失。當它告訴RabbitMQ將訊息儲存至磁碟,RabbitMQ接收訊息並且還沒儲存時,然後有一個很短的時間間隔,RabbitMQ可能只是將訊息儲存在快取中,並沒有寫入磁碟。所以持久化並不是一定保證的。如果需要訊息佇列持久化的強保證,可以使用Publisher Confirms

、公平分發

上面提到RabbitMQ時按照迴圈分發的方式進行訊息傳送,保證每個消費者收到的訊息數量。但是忽略了消費者的閒忙情況。比如有兩個消費者,奇數類的任務較繁重,偶數類訊息較為輕鬆,就會出現一個消費者一直處理耗時任務處於阻塞狀態,另一個消費者一直處理簡單任務處於空閒狀態,但是它們接收的任務量卻是一致的。

可以通過在消費者端使用BasicQos方法,設定prefetchCount:1,告知RabbitMQ,在未接收到消費者的訊息確認時不再分發訊息,避免消費者一直處於忙碌狀態。

1     //Receive.cs
2     //申明佇列
3     channel.QueueDeclare(queue: "task_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
4     //設定prefetchCount:1告知RabbitMQ,在未接收到消費者的訊息確認時不再分發訊息
5     channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

註釋:如果是所有的消費者都處於忙碌狀態,佇列可能會被塞滿。需要採取其他方法,比如增加更多的消費者。

參考:

https://www.rabbitmq.com/getstarted.html

https://stackoverflow.com/questions/61374796/c-sharp-convert-readonlymemorybyte-to-byte