RabbitMQ教程C#版 - 工作隊列
先決條件
本教程假定 RabbitMQ 已經安裝,並運行在localhost
標準端口(5672)。如果你使用不同的主機、端口或證書,則需要調整連接設置。從哪裏獲得幫助
如果您在閱讀本教程時遇到困難,可以通過郵件列表 聯系我們。
工作隊列#
(使用 .NET Client)
在 教程[1] 中,我們編寫了兩個程序,用於從一個指定的隊列發送和接收消息。在本文中,我們將創建一個工作隊列,用於在多個工作線程間分發耗時的任務。
工作隊列(又名:任務隊列)背後的主要想法是避免立即執行資源密集型、且必須等待其完成的任務。相反的,我們把這些任務安排在稍後完成。我們可以將任務封裝為消息並把它發送到隊列中,在後臺運行的工作進程將從隊列中取出任務並最終執行。當您運行多個工作線程,這些任務將在這些工作線程之間共享。
這個概念在Web應用程序中特別有用,因為在一個 HTTP 請求窗口中無法處理復雜的任務。
準備#
我們將略微修改上一個示例中的Send程序,以其可以在命令行發送任意消息。
這個程序將調度任務到我們的工作隊列中,所以讓我們把它命名為NewTask
:
像 教程[1]一樣,我們需要生成兩個項目:
Copydotnet new console --name NewTask
mv NewTask/Program.cs NewTask/NewTask.cs
dotnet new console --name Worker
mv Worker/Program.cs Worker/Worker.cs
cd NewTask
dotnet add package RabbitMQ.Client
dotnet restore
cd ../Worker
dotnet add package RabbitMQ.Client
dotnet restore
Copy
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
從命令行參數獲取消息的幫助方法:
Copyprivate static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
我們舊的Receive.cs
腳本也需要進行一些更改:它需要為消息體中的每個點模擬一秒種的時間消耗。它將處理由 RabbitMQ 發布的消息,並執行任務,因此我們把它復制到Worker
項目並修改:
// 構建消費者實例。
var consumer = new EventingBasicConsumer(channel);
// 綁定消息接收事件。
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
// 模擬耗時操作。
int dots = message.Split(‘.‘).Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);
模擬虛擬任務的執行時間:
Copyint dots = message.Split(‘.‘).Length - 1;
Thread.Sleep(dots * 1000);
循環調度#
使用任務隊列的優點之一是能夠輕松地並行工作。如果我們正在積累積壓的工作,我們僅要增加更多的工作者,並以此方式可以輕松擴展。
首先,我們嘗試同時運行兩個Worker
實例。他們都會從隊列中獲取消息,但究竟如何?讓我們來看看。
您需要打開三個控制臺,兩個運行Worker
程序,這些控制臺作為我們的兩個消費者 - C1和C2。
# shell 1
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C
Copy
# shell 2
cd Worker
dotnet run
# => [*] Waiting for messages. To exit press CTRL+C
在第三個控制臺中,我們將發布一些新的任務。一旦你已經運行了消費者,你可以嘗試發布幾條消息:
Copy# shell 3
cd NewTask
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."
讓我們看看有什麽發送到了我們的Worker
程序:
# shell 1
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received ‘First message.‘
# => [x] Received ‘Third message...‘
# => [x] Received ‘Fifth message.....‘
Copy
# shell 2
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received ‘Second message..‘
# => [x] Received ‘Fourth message....‘
默認情況下,RabbitMQ 會按順序將每條消息發送給下一個消費者。消費者數量平均的情況下,每個消費者將會獲得相同數量的消息。這種分配消息的方式稱為循環(Round-Robin)。請嘗試開啟三個或更多的Worker
程序來驗證。
消息確認#
處理一項任務可能會需要幾秒鐘的時間。如果其中一個消費者開啟了一項長期的任務並且只完成了部分就掛掉了,您可能想知道會發生什麽?在我們當前的代碼中,一旦 RabbitMQ 把消息分發給了消費者,它會立即將這條消息標記為刪除。在這種情況下,如果您停掉某一個 Worker,我們將會丟失這條正在處理的消息,也將丟失所有分發到該 Worker 但尚未處理的消息。
但是我們不想丟失任何一個任務。如果一個 Worker 掛掉了,我們希望這個任務能被重新分發給其他 Worker。
為了確保消息永遠不會丟失,RabbitMQ 支持 消息確認 機制。消費者回發一個確認信號 Ack(nowledgement) 給 RabbitMQ,告訴它某個消息已經被接收、處理並且可以自由刪除它。
如果一個消費者在還沒有回發確認信號之前就掛了(其通道關閉,連接關閉或者 TCP 連接丟失),RabbitMQ 會認為該消息未被完全處理,並將其重新排隊。如果有其他消費者同時在線,該消息將會被會迅速重新分發給其他消費者。這樣,即便 Worker 意外掛掉,也可以確保消息不會丟失。
沒有任何消息會超時;當消費者死亡時,RabbitMQ 將會重新分發消息。即使處理消息需要非常非常長的時間也沒關系。
默認情況下,手動消息確認 模式是開啟的。在前面的例子中,我們通過將autoAck
(“自動確認模式”)參數設置為true
來明確地關閉手動消息確認模式。一旦完成任務,是時候刪除這個標誌並且從 Worker 手動發送一個恰當的確認信號給RabbitMQ。
// 構建消費者實例。
var consumer = new EventingBasicConsumer(channel);
// 綁定消息接收事件。
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
// 模擬耗時操作。
int dots = message.Split(‘.‘).Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
// 手動發送消息確認信號。
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
// autoAck:false - 關閉自動消息確認,調用`BasicAck`方法進行手動消息確認。
// autoAck:true - 開啟自動消息確認,當消費者接收到消息後就自動發送 ack 信號,無論消息是否正確處理完畢。
channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
使用上面這段代碼,我們可以確定的是,即使一個 Worker 在處理消息時,我們通過使用CTRL + C
來終止它,也不會丟失任何消息。Worker 掛掉不久,所有未確認的消息將會被重新分發。
忘記確認
Copy
遺漏BasicAck
是一個常見的錯誤。這是一個很簡單的錯誤,但導致的後果卻是嚴重的。當客戶端退出時(看起來像是隨機分發的),消息將會被重新分發,但是RabbitMQ會吃掉越來越多的內存,因為它不能釋放未確認的消息。
為了調試這種錯誤,您可以使用rabbitmqctl
來打印messages_unacknowledged
字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows上,刪除
Copysudo
:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久化#
我們已經學習了如何確保即使消費者掛掉,任務也不會丟失。但是如果 RabbitMQ 服務器停止,我們的任務還是會丟失。
當 RabbitMQ 退出或崩潰時,它會忘記已存在的隊列和消息,除非告訴它不要這樣做。為了確保消息不會丟失,有兩件事是必須的:我們需要將隊列和消息標記為持久。
首先,我們需要確保 RabbitMQ 永遠不會丟失我們的隊列。為了做到這一點,我們需要把隊列聲明是持久的(Durable):
Copy// 聲明隊列,通過指定 durable 參數為 true,對消息進行持久化處理。
channel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
雖然這個命令本身是正確的,但是它在當前設置中不會起作用。那是因為我們已經定義過一個名為hello
的隊列,並且這個隊列不是持久化的。RabbitMQ 不允許使用不同的參數重新定義已經存在的隊列,並會向嘗試執行該操作的程序返回一個錯誤。但有一個快速的解決辦法 - 讓我們用不同的名稱聲明一個隊列,例如task_queue
:
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
註意,該聲明隊列QueueDeclare
方法的更改需要同時應用於生產者和消費者代碼。
此時,我們可以確定的是,即使 RabbitMQ 重新啟動,task_queue
隊列也不會丟失。現在我們需要將我們的消息標記為持久的(Persistent) - 通過將IBasicProperties.Persistent
設置為true
。
// 將消息標記為持久性。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
關於消息持久性的說明
將消息標記為Persistent
並不能完全保證消息不會丟失。盡管它告訴 RabbitMQ 將消息保存到磁盤,但當 RabbitMQ 接收到消息並且尚未保存消息時仍有一段時間間隔。此外,RabbitMQ 不會為每條消息執行fsync(2)
- 它可能只是保存到緩存中,並沒有真正寫入磁盤。消息的持久化保證並不健壯,但對於簡單的任務隊列來說已經足夠了。如果您需要一個更加健壯的保證,可以使用 發布者確認。
公平調度#
您可能已經註意到調度仍然無法完全按照我們期望的方式工作。例如,在有兩個 Worker 的情況下,假設所有奇數消息都很龐大、偶數消息都很輕量,那麽一個 Worker 將會一直忙碌,而另一個 Worker 幾乎不做任何工作。是的,RabbitMQ 並不知道存在這種情況,它仍然會平均地分發消息。
發生這種情況是因為 RabbitMQ 只是在消息進入隊列後就將其分發。它不會去檢查每個消費者所擁有的未確認消息的數量。它只是盲目地將第 n 條消息分發給第 n 位消費者。
為了改變上述這種行為,我們可以使用參數設置prefetchCount = 1
的basicQos
方法。
這就告訴 RabbitMQ 同一時間不要給一個 Worker 發送多條消息。或者換句話說,不要向一個 Worker 發送新的消息,直到它處理並確認了前一個消息。
相反,它會這個消息調度給下一個不忙碌的 Worker。
channel.BasicQos(0, 1, false);
關於隊列大小的說明
如果所有的 Worker 都很忙,您的隊列可能會被填滿。請留意這一點,可以嘗試添加更多的 Worker,或者使用其他策略。
組合在一起#
我們NewTask.cs
類的最終代碼:
using System;
using RabbitMQ.Client;
using System.Text;
class NewTask
{
public static void Main(string[] args)
{
// 實例化連接工廠。
var factory = new ConnectionFactory() { HostName = "localhost" };
// 創建連接、信道。
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
// 聲明隊列,標記為持久性。
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
// 獲取發送消息。
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
// 將消息標記為持久性。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
// 發送數據包。
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
}
(NewTask.cs 源碼)
還有我們的Worker.cs
:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
class Worker
{
public static void Main()
{
// 實例化連接工廠。
var factory = new ConnectionFactory() { HostName = "localhost" };
// 創建連接、信道。
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
// 聲明隊列,標記為持久性。
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
// 告知 RabbitMQ,在未收到當前 Worker 的消息確認信號時,不再分發給消息,確保公平調度。
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" [*] Waiting for messages.");
// 構建消費者實例。
var consumer = new EventingBasicConsumer(channel);
// 綁定消息接收事件。
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
// 模擬耗時操作。
int dots = message.Split(‘.‘).Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
// 手動發送消息確認信號。
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",
autoAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
(Worker.cs 源碼)
使用消息確認機制和BasicQ
您可以創建一個工作隊列。即使 RabbitMQ 重新啟動,通過持久性選項也可讓任務繼續存在。
有關IModel
方法和IBasicProperties
的更多信息,您可以在線瀏覽 RabbitMQ .NET客戶端API參考。
現在,我們可以繼續閱讀 教程[3],學習如何向多個消費者發送相同的消息。
寫在最後#
本文翻譯自 RabbitMQ 官方教程 C# 版本。如本文介紹內容與官方有所出入,請以官方最新內容為準。水平有限,翻譯的不好請見諒,如有翻譯錯誤還請指正。
- 原文鏈接:RabbitMQ tutorial - Work Queues
- 實驗環境:RabbitMQ 3.7.4 、.NET Core 2.1.2、Visual Studio Code
- 最後更新:2018-04-03
RabbitMQ教程C#版 - 工作隊列