RabbitMQ --- Work Queues(工作隊列)
目錄
RabbitMQ --- Hello Mr.Tua
前言
Work Queues 即工作隊列,它表示一個 Producer 對應多個 Consumer,包括兩種分發模式:輪循分發(Round-robin)和公平分發(Fair dispatch)。旨在為了避免立即執行任務時出現占用很多資源和時間卻又必須等待完成的現象。
原理分析: Producer 把工作任務轉化為消息發送給隊列,當後臺有一個 Consumer 進程在運行時,它會不間斷地從隊列中取出消息來執行;當後臺有多個 Consumer 進程在運行時,它們會不間斷地從隊列中取出消息采取並行執行的方式以提高效率。
輪循分發(Round-robin )
我修改了第一篇文章中的代碼,用線程來模擬處理消息耗時的場景,分別在10個消息的末尾增加符號“>”,每個符號“>”表示該消息在線程中執行需要耗時1秒,每個消息處理完畢時以“OK”表示結束。
Producer 代碼片段:
for (int m = 0; m < 10; m++) { string marks = string.Empty; for (int n = 0; n <= m; n++) { marks += ">"; }string msg = "Mr.Tua" + marks + marks.Length + "s"; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish ( exchange: string.Empty, routingKey: "Tua", basicProperties: null, body: body ); Console.WriteLine("Producer sent message: {0}", msg); }
Consumer 代碼片段:
consumer.Received += (sender, e) => { var body = e.Body; var msg = Encoding.UTF8.GetString(body); int marks = msg.ToCharArray().Where(c => c.ToString() == ">").Count(); Console.WriteLine("Consumer received message: {0}", msg);
Thread.Sleep(marks * 1000); Console.WriteLine("OK"); };
Producer 控制臺(後啟動運行):
Consumer 控制臺(先啟動運行 x 2 ):
可以看到 Producer 按照順序將每個消息發送給下一個 Consumer (一次性平均分配),每個 Consumer 得到相等數量的消息,當中不用考慮處理消息時需要耗費多少時間,也就是說不關心 Consumer 是否繁忙或空閑,這種默認的分發模式稱為輪循分發(Round-robin)。
消息應答(Message acknowledgment)
如果某個 Consumer 在處理消息時由於各種原因掛了導致 Producer 沒有收到消息處理完成時的應答,那麽就會丟失 Consumer 正在處理和沒有處理的消息。
兩個 Consumer 同時運行的過程中我關閉了其中一個,可以看到下面的 Consumer 完成了第2個消息,丟失了第4(未處理完畢)、6、8、10個消息。
在這種情況下如何保證消息不丟失呢?
消息應答(Message acknowledgment):如果 Consumer 掛了沒有發送應答,Producer 會重新轉發給其它的 Consumer 以保證不丟失消息。
修改 Consumer 代碼:
var body = e.Body; var msg = Encoding.UTF8.GetString(body); int marks = msg.ToCharArray().Where(c => c.ToString() == ">").Count(); Console.WriteLine("Consumer received message: {0}", msg); Thread.Sleep(marks * 1000); Console.WriteLine("OK"); //每個消息處理完畢時手動發送消息應答 channel.BasicAck ( deliveryTag: e.DeliveryTag, //該消息的Index multiple: false//是否批量應答,true:批量應答所有小於該deliveryTag的消息 );
channel.BasicConsume ( queue: "Tua", noAck: false,//手動應答 consumer: consumer );
可以看到雖然下面的 Consumer 掛了,但是 Producer 重新把消息發給了上面的 Consumer 去處理。
消息持久化(Message durability)
現在已經知道當 Consumer 掛了不丟失消息的解決方案,可是 RabbitMQ 服務要是掛了會導致所有的隊列和消息丟失,這種情況該怎麽辦呢?
消息持久化(Message durability):讓所有的隊列和消息都開啟持久化功能,將隊列和消息都保存在磁盤上以達到持久化的目的。
另外還有一種為了解決事務機制性能開銷大(導致吞吐量下降)而提出的更強大的消息持久化的方式叫做 Publisher Confirm,這裏不作討論。
修改 Producer 代碼:
channel.QueueDeclare ( queue: "Tua", durable: true,//開啟隊列持久化 exclusive: false, autoDelete: false, arguments: null );
var basicProperties = channel.CreateBasicProperties(); basicProperties.Persistent = true;//開啟消息持久化 channel.BasicPublish ( exchange: string.Empty, routingKey: "Tua", basicProperties: basicProperties, body: body );
修改 Consumer 代碼:
channel.QueueDeclare ( queue: "Tua", durable: true,//開啟隊列持久化 exclusive: false, autoDelete: false, arguments: null );
運行程序時報出一個異常錯誤:
這是因為修改了代碼 durable: true,開啟了隊列的持久化,然而 RabbitMQ 是不允許使用不同的參數重新定義一個已有的同名隊列。
兩種方法可以解決:
1.重新定義一個不同名的隊列;
2.刪除已有的同名隊列。
第一種方法沒有什麽好說的,這裏說第二種方法,打開 RabbitMQ 管理平臺刪除已有的同名隊列:
測試步驟:首先啟動 Producer ---> 關閉 RabbitMQ 服務 ---> 啟動 RabbitMQ 服務 ---> 最後啟動 Consumer。
可以看到隊列和消息都木有丟失,這裏就不再上圖了。
公平分發(Fair dispatch)
在介紹輪循分發(Round-robin)時有提到它是不關心 Consumer 是否繁忙或空閑的,但是這樣很可能就會出現有的 Consumer 勞累過度趕腳身體被掏空,而有的 Consumer 悠閑自得趕腳無用武之地的問題,那該怎麽辦呢?
公平分發(Fair dispatch):不會同時給一個 Consumer 發送多個新消息,只有在 Consumer 空閑的時候才會給它發送一個新消息。
修改 Consumer 代碼:
//請求服務的特殊設置 channel.BasicQos ( prefetchSize: 0,//服務傳送消息的最大容量,0表示無限制 prefetchCount: 1,//服務傳送消息的最大數量,0表示無限制 global: false//false:將以上的設置應用於Consumer級別,true:將以上的設置應用於Channel級別 );
為了便於演示,我把 Producer 發送消息的順序改為從10到1。
可以看到當 Consumer 空閑的時候才會給它發送一個新消息,而且在公平分發(Fair dispatch)模式下支持動態增加 Consumer ,使得新加的 Consumer 可以立即處理還沒有發送出去的消息。
反觀在默認的輪循分發(Round-robin)模式下已經將消息一次性平均分配完畢,就算是動態增加了 Consumer 也然並卵。。。
示例代碼
using RabbitMQ.Client; using System; using System.Text; namespace WorkQueuesProducer { class Program { static void Main(string[] args) { var factory = new ConnectionFactory { HostName = "192.168.31.212", UserName = "Tua", Password = "Tua", Port = 5672 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare ( queue: "Tua", durable: true, exclusive: false, autoDelete: false, arguments: null ); for (int m = 0; m < 10; m++) { string marks = string.Empty; for (int n = 0; n <= m; n++) { marks += ">"; } string msg = "Mr.Tua" + marks + marks.Length + "s"; var body = Encoding.UTF8.GetBytes(msg); var basicProperties = channel.CreateBasicProperties(); basicProperties.Persistent = true; channel.BasicPublish ( exchange: string.Empty, routingKey: "Tua", basicProperties: basicProperties, body: body ); Console.WriteLine("Producer sent message: {0}", msg); } Console.ReadLine(); } } } } }
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Linq; using System.Text; using System.Threading; namespace WorkQueueConsumer { class Program { static void Main(string[] args) { var factory = new ConnectionFactory { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare ( queue: "Tua", durable: true, exclusive: false, autoDelete: false, arguments: null ); channel.BasicQos ( prefetchSize: 0, prefetchCount: 1, global: false ); var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { var body = e.Body; var msg = Encoding.UTF8.GetString(body); int marks = msg.ToCharArray().Where(c => c.ToString() == ">").Count(); Console.WriteLine("Consumer received message: {0}", msg); Thread.Sleep(marks * 1000); Console.WriteLine("OK"); channel.BasicAck ( deliveryTag: e.DeliveryTag, multiple: false ); }; channel.BasicConsume ( queue: "Tua", noAck: false, consumer: consumer ); Console.ReadLine(); } } } } }
RabbitMQ --- Work Queues(工作隊列)