RabbitMQ(三):Hello RabbitMQ
一、概念
RabbitMQ官網介紹:
RabbitMQ是訊息代理,用來接收並轉發訊息。可以把RabbitMQ看作是郵局。將要郵寄的郵件放在郵箱中,確保郵遞員會把郵件傳遞給接收者。
二、模型
"P"—producer:生產者。建立訊息後傳送到佇列(queue)中。
queue:佇列。訊息儲存在佇列中,佇列容量僅受主機的記憶體和磁碟約束,基本上是一個無限的緩衝區。多個生產者(producers)能夠把訊息傳送給同一個佇列,同樣,多個消費者(consumers)也能從同一個佇列(queue)中獲取資料。
“C”—consumers:消費者,一個等待獲取訊息的程式。
生產者、佇列和消費者不必位於同一臺機器上。一個應用程式可以是生產者,也可以是消費者。
三、RabbitMQ簡單使用
編寫兩個控制檯程式,傳送單個訊息的生產者和接收訊息的消費者。引用RabbitMQ.Client.dll。
- 生產者:
1 //Send.cs 2 static void Main(string[] args) 3 { 4 //例項化連線工廠 5 var factory = new ConnectionFactory() 6 { 7 HostName = "localhost", //RabbitMQ服務在本地執行 8 UserName = "guest", //使用者名稱 9 Password = "guest" //密碼 10 }; 11 //建立連線 12 using (var connection = factory.CreateConnection()) 13 { 14 //建立通道 15 using (var channel = connection.CreateModel()) 16 { 17 //申明一個名為hello的佇列 18 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); 19 //建立傳輸的訊息內容 20 string message = "hello rabbitmq"; 21 //構建byte訊息資料包 22 var body = Encoding.UTF8.GetBytes(message); 23 //傳送資料包 24 channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); 25 Console.WriteLine("Sent: {0}", message); 26 Console.ReadLine(); 27 } 28 } 29 }
- 建立一個連線工廠,設定目標(如果是在本機,所以設定為localhost;如果RabbitMQ不在本機,只需要設定目標機器的IP地址或者機器名稱即可),然後設定使用者名稱和密碼。
- 建立一個channel。
- 建立佇列,將訊息傳送到佇列中。訊息是以二進位制陣列形式傳輸的,所以如果訊息是實體物件,需要序列化然後轉化為二進位制陣列。
- 客戶端(消費者)傳送程式碼後,訊息會發送到訊息佇列中。
結果顯示:
可在控制檯使用rabbitmqctl list_queues來檢視所有的訊息佇列
或者通過web管理介面檢視
- 消費者:
1 //Receive.cs 2 static void Main(string[] args) 3 { 4 //例項化連線工廠 5 var factory = new ConnectionFactory() 6 { 7 HostName = "localhost", //RabbitMQ服務在本地執行 8 UserName = "guest", //使用者名稱 9 Password = "guest" //密碼 10 }; 11 12 //建立連線 13 using (var connection = factory.CreateConnection()) 14 { 15 //建立通道 16 using (var channel = connection.CreateModel()) 17 { 18 //申明一個名為hello的佇列 19 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); 20 //構造消費者例項 21 var consumer = new EventingBasicConsumer(channel); 22 //繫結訊息接收後的事件委託 23 consumer.Received += (model, ea) => 24 { 25 var body = ea.Body; 26 var message = Encoding.UTF8.GetString(body.ToArray()); 27 Console.WriteLine("Received: {0}", message); 28 }; 29 //啟動消費者 30 channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer); 31 Console.ReadLine(); 32 } 33 } 34 }
結果顯示:
再查queue中的訊息
發現訊息的內容再被接收後就被刪除了。
可以看出來,生產者和消費者的程式碼前部分是一樣的。主要區別在於生產者(傳送端)呼叫channel.BasicPublish()來發送訊息;而消費者(接收端)需要例項化一個EventBasicConsumer例項來進行訊息處理。需要注意的是,兩者的佇列名稱需保持一致。
四、工作佇列
下面看一下RabbitMQ的工作佇列(Work Queues/Task Queues)。
工作佇列的好處就是能並行處理佇列,在多個工作人員之間分配任務。如果佇列中堆積了很多訊息/任務,只需要新增多個工作者就可以了。
例子:啟動兩個接收端來等待接收訊息,再開啟一個傳送端傳送訊息。
我們稍微修改上面例子中的Send程式,允許從命令列傳送任意命令:
1 //Send.cs 2 static void Main(string[] args) 3 { 4 var factory = new ConnectionFactory() 5 { 6 HostName = "localhost", 7 UserName = "guest", 8 Password = "guest" 9 }; 10 using (var connection = factory.CreateConnection()) 11 { 12 using (var channel = connection.CreateModel()) 13 { 14 channel.QueueDeclare(queue: "task_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); 15 string message = GetMessage(args); 16 var body = Encoding.UTF8.GetBytes(message); 17 var properties = channel.CreateBasicProperties(); 18 properties.Persistent = true; 19 20 channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); 21 Console.WriteLine("Send {0}", message); 22 } 23 } 24 } 25 26 private static string GetMessage(string[] args) 27 { 28 return ((args.Length > 0) ? string.Join(" ", args) : "hello rabbitmq"); 29 }
修改接收端,偽造任務的複雜度,根據訊息中“.”的個數增加對應的Sleep時間:
1 //Receive.cs 2 static void Main(string[] args) 3 { 4 var factory = new ConnectionFactory() 5 { 6 HostName = "localhost", 7 UserName = "guest", 8 Password = "guest" 9 }; 10 using (var connection = factory.CreateConnection()) 11 { 12 using (var channel = connection.CreateModel()) 13 { 14 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); 15 16 var consumer = new EventingBasicConsumer(channel); 17 consumer.Received += (model, ea) => 18 { 19 var body = ea.Body.ToArray(); 20 var message = Encoding.UTF8.GetString(body); 21 Console.WriteLine("Received {0}", message); 22 23 //模擬執行時間的假任務 24 int dot = message.Split('.').Length - 1; 25 Thread.Sleep(dot * 1000); 26 27 Console.WriteLine("Done"); 28 }; 29 channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer); 30 Console.ReadLine(); 31 } 32 } 33 }
先啟動兩個接收端等待接收訊息,然後再啟動一個傳送端開始傳送訊息。
通過cmd使用Send傳送端傳送5條訊息,每條訊息後“.”的數量代表消耗的時長:
通過兩個接收端檢視訊息的接收:
從圖中可以看出,兩個接收端按順序依次接收到訊息。第一條訊息給接收端A,第二條訊息給接收端B,第三條訊息給接收端A......以此類推。
所以在預設情況下,RabbitMQ是按順序依次將每條訊息傳送給消費者,平均每個消費者將會接收到相同數量的訊息。這種分發的方式稱為迴圈(round-robin)。
五、訊息確認
在上面的例子中,當RabbitMQ將訊息傳送給消費者,訊息就會從佇列中移除。如果這時消費者突然掛掉(比如通道關閉、連線關閉、TCP連線丟失等),就會造成訊息丟失。
為了防止訊息丟失,RabbitMQ提供了訊息確認(message acknowledgements)。當消費者成功接收訊息並處理完成,會發送一個ack(確認)到RabbitMQ,Rabbit接收到這個訊號後就會把這條已處理的訊息從佇列中刪除。如果消費者掛掉了,沒有傳送ack,RabbitMQ就會明白某個訊息沒有被正常處理,就會將該訊息重新入隊。如果同一時間還有其他消費者線上,RabbitMQ會將這條訊息重新發送給另一個消費者。
RabbitMQ中沒有訊息超時的概念,只有當消費者關閉或失去連線時,RabbitMQ才會重新分發訊息。
訊息確認是預設開啟的。
下面通過修改Receive.cs的程式碼來看訊息確認這一機制
1 var consumer = new EventingBasicConsumer(channel); 2 3 consumer.Received += (model, ea) => 4 { 5 var body = ea.Body; 6 var message = Encoding.UTF8.GetString(body.ToArray()); 7 Console.WriteLine("Received {0}", message); 8 9 //模擬執行時間的假任務 10 int dot = message.Split('.').Length - 1; 11 Thread.Sleep(dot * 1000); 12 13 Console.WriteLine("Done"); 14 //傳送訊息確認訊號(手動確認) 15 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 16 }; 17 //啟動消費者 18 //autoAck: true 自動進行訊息確認,當消費者接收到訊息後自動傳送ack訊號,不管訊息是否處理完畢 19 //autoAck: false 關閉自動確認,通過呼叫BasicAck方法手動確認 20 channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
主要改動在於將autoAck設定為false,並在訊息處理完後呼叫BasicAck進行手動確認。
由上圖可知,訊息傳送端連續傳送五條訊息,接收端A接收並處理了第一條訊息,接收端B被迴圈分配到第二條訊息,但在處理完成之前接收端B中斷了。於是RabbitMQ將被中斷的訊息2(Second Message)分發給接收端A。
六、訊息持久化
訊息確認確保在消費端異常時,訊息不會丟失並能夠被重新分發。但如果是RabbitMQ服務端出現異常,訊息依然會丟失。
當RabbitMQ服務端奔潰或者關閉時,它會忘記佇列和訊息。我們可以通過指定 durable:true 和 Persisent=true,來將佇列和訊息標記為持久化。
1 //Send.cs 2 //申明佇列時,指定durable: true對訊息進行持久化 3 channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); 4 string message = GetMessage(args); 5 var body = Encoding.UTF8.GetBytes(message); 6 //將訊息標誌為永續性:將IBasicProperties.Persistent設定為true 7 var properties = channel.CreateBasicProperties(); 8 properties.Persistent = true; 9 10 channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);
註釋:將訊息標記為永續性並不能完全保證訊息不會丟失。當它告訴RabbitMQ將訊息儲存至磁碟,RabbitMQ接收訊息並且還沒儲存時,然後有一個很短的時間間隔,RabbitMQ可能只是將訊息儲存在快取中,並沒有寫入磁碟。所以持久化並不是一定保證的。如果需要訊息佇列持久化的強保證,可以使用Publisher Confirms。
七、公平分發
上面提到RabbitMQ時按照迴圈分發的方式進行訊息傳送,保證每個消費者收到的訊息數量。但是忽略了消費者的閒忙情況。比如有兩個消費者,奇數類的任務較繁重,偶數類訊息較為輕鬆,就會出現一個消費者一直處理耗時任務處於阻塞狀態,另一個消費者一直處理簡單任務處於空閒狀態,但是它們接收的任務量卻是一致的。
可以通過在消費者端使用BasicQos方法,設定prefetchCount:1,告知RabbitMQ,在未接收到消費者的訊息確認時不再分發訊息,避免消費者一直處於忙碌狀態。
1 //Receive.cs 2 //申明佇列 3 channel.QueueDeclare(queue: "task_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); 4 //設定prefetchCount:1告知RabbitMQ,在未接收到消費者的訊息確認時不再分發訊息 5 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
註釋:如果是所有的消費者都處於忙碌狀態,佇列可能會被塞滿。需要採取其他方法,比如增加更多的消費者。
參考:
https://www.rabbitmq.com/getstarted.html
https://stackoverflow.com/questions/61374796/c-sharp-convert-readonlymemorybyte-to-byte