快速掌握RabbitMQ(三)——訊息確認、持久化、優先順序的C#實現
1 訊息確認
在一些場合,如轉賬、付費時每一條訊息都必須保證成功的被處理。AMQP是金融級的訊息佇列協議,有很高的可靠性,這裡介紹在使用RabbitMQ時怎麼保證訊息被成功處理的。訊息確認可以分為兩種:一種是生產者傳送訊息到Broke時,Broker給生產者傳送確認回執,用於告訴生產者訊息已被成功傳送到Broker;一種是消費者接收到Broker傳送的訊息時,消費者給Broker傳送確認回執,用於告訴消費者訊息已成功被消費者接收。
下邊分別介紹生產者端和消費者端的訊息確認方法。準備條件:使用Web管理工具新增exchange、queue並繫結,bindingKey為“mykey”,如下所示:
1 生產者端訊息確認(tx機制和Confirm模式)
生產者端的訊息確認:當生產者將訊息傳送給Broker,Broker接收到訊息給生產者傳送確認回執。生產者端的訊息確認有兩種方式:tx機制和Confirm模式。
1.tx機制
tx機制可以叫做事務機制,RabbitMQ中有三個與tx機制的方法:txSelect(), txCommit()和txRollback()。 channel.txSelect() 用於將當前channel設定成transaction模式, channel.txCommit() 提交事務, channel.txRollback() 回滾事務。使用tx機制,我們首先要通過txSelect方法開啟事務,然後釋出訊息給broker伺服器了,如果txCommit提交成功了,則說明訊息成功被broker接收了;如果在txCommit執行之前broker異常崩潰或者由於其他原因丟擲異常,這個時候我們可以捕獲異常,通過txRollback回滾事務。看一個tx機制的簡單實現:
var factory = new ConnectionFactory() { //rabbitmq-server所在裝置ip,這裡就是本機 HostName = "127.0.0.1", UserName = "wyy",//使用者名稱 Password = "123321"//密碼 }; //建立連線connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者準備就緒...."); string message = ""; //傳送訊息 //在控制檯輸入訊息,按enter鍵傳送訊息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); try { //開啟事務機制 channel.TxSelect(); //傳送訊息 channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: body); //事務提交 channel.TxCommit(); Console.WriteLine($"【{message}】傳送到Broke成功!"); } catch (Exception) { Console.WriteLine($"【{message}】傳送到Broker失敗!"); channel.TxRollback(); } } } } Console.ReadKey(); }
程式執行結果如下:
2 Confirm模式
C#的RabbitMQ API中,有三個與Confirm相關的方法:ConfirmSelect(),WaitForConfirms()和WaitForConfirmOrDie。 channel.ConfirmSelect() 表示開啟Confirm模式; channel.WaitForConfirms() 等待所有訊息確認,如果所有的訊息都被服務端成功接收返回true,只要有一條沒有被成功接收就返回false。 channel.WaitForConfirmsOrDie() 和WaitForConfirms作用型別,也是等待所有訊息確認,區別在於該方法沒有返回值(Void),如果有任意一條訊息沒有被成功接收,該方法會立即丟擲一個OperationInterrupedException型別異常。看一個Confirm模式的簡單實現:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在裝置ip,這裡就是本機 HostName = "127.0.0.1", UserName = "wyy",//使用者名稱 Password = "123321"//密碼 }; //建立連線connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者準備就緒...."); string message = ""; //在控制檯輸入訊息,按enter鍵傳送訊息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); //開啟Confirm模式 channel.ConfirmSelect(); //傳送訊息 channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: body); //WaitForConfirms確認訊息(可以同時確認多條訊息)是否傳送成功,如果返回false表示傳送失敗,會自動重新發送 if (channel.WaitForConfirms()) { Console.WriteLine($"【{message}】傳送到Broke成功!"); } } } } Console.ReadKey(); }
程式執行結果:
2 消費者端訊息確認(自動確認和顯示確認)
從Broke傳送到消費者時,RabbitMQ提供了兩種訊息確認的方式:自動確認和顯示確認。
1 自動確認
自動確認:當RabbbitMQ將訊息傳送給消費者後,消費者端接收到訊息後,不等待訊息處理結束,立即自動回送一個確認回執。自動確認的用法十分簡單,設定消費方法的引數autoAck為true即可,我們前邊的例子都是使用的自動確認,這裡不再詳細演示,如下:
channel.BasicConsume(queue: "myqueue",autoAck: true, consumer: consumer);
注意:Broker會在接收到確認回執時刪除訊息,如果消費者接收到訊息並返回了確認回執,然後這個消費者在處理訊息時掛了,那麼這條訊息就再也找不回來了。
2 顯示確認
我們知道自動確認可能會出現訊息丟失的問題,我們不免會想到:Broker收到回執後才刪除訊息,如果可以讓消費者在接收訊息時不立即返回確認回執,等到訊息處理完成後(或者完成一部分的邏輯)再返回確認回執,這樣就保證消費端不會丟失訊息了!這正是顯式確認的思路。使用顯示確認也比較簡單,首先將Resume方法的引數autoAck設定為false,然後在消費端使用程式碼 channel.BasicAck()/BasicReject()等方法 來確認和拒絕訊息。看一個栗子:
生產者程式碼如下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在裝置ip,這裡就是本機 HostName = "127.0.0.1", UserName = "wyy",//使用者名稱 Password = "123321"//密碼 }; //建立連線connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者準備就緒...."); string message = ""; //傳送訊息 //在控制檯輸入訊息,按enter鍵傳送訊息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); //基本釋出 channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: body); Console.WriteLine($"訊息【{message}】已傳送到佇列"); } } } Console.ReadKey(); }
消費者程式碼如下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在裝置ip,這裡就是本機 HostName = "127.0.0.1", UserName = "wyy",//使用者名稱 Password = "123321"//密碼 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { string message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"接受到訊息【{message}】"); //以news開頭表示是新聞型別,處理完成後確認訊息 if (message.StartsWith("news")) { //這裡處理訊息balabala Console.WriteLine($"【{message}】是新聞訊息,處理訊息並確認"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } //不以news開頭表示不是新聞型別,不進行處理,把訊息退回到queue中 else { Console.WriteLine($"【{message}】不是新聞型別,拒絕處理"); channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); } }; Console.WriteLine("消費者準備就緒...."); //第五步:處理訊息 channel.BasicConsume(queue: "myqueue", autoAck: false, consumer: consumer); Console.ReadKey(); } } }
介紹一下程式碼中標紅的兩個方法: channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 方法用於確認訊息,deliveryTag引數是分發的標記,multiple表示是否確認多條。 channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false); 方法用於拒絕訊息,deliveryTag也是指分發的標記,requeue表示訊息被拒絕後是否重新放回queue中,true表示放回queue中,false表示直接丟棄。
執行這兩個應用程式,通過生產者傳送兩條訊息,效果如下:
一些意外的情況:使用顯式確認時,如果消費者處理完訊息不傳送確認回執,那麼訊息不會被刪除,訊息的狀態一直是Unacked,這條訊息也不會再發送給其他消費者。如果一個消費者在處理訊息時尚未傳送確認回執的情況下掛掉了,那麼訊息會被重新放入佇列(狀態從Unacked變成Ready),有其他消費者存時,訊息會發送給其他消費者。
2 訊息持久化/優先順序
1 訊息持久化(Persistent)
在前邊已經介紹了exchange和queue的持久化,把exchange和queue的durable屬性設定為true,重啟rabbitmq服務時( 重啟命令:rabbitmqctl stop_app ;rabbitmqctl start_app ),exchange和queue也會恢復。我們需要注意的是:如果queue設定durable=true,rabbitmq服務重啟後佇列雖然會存在,但是佇列內的訊息會丟全部丟失。那麼怎麼實現訊息的持久化呢?實現的方法很簡單:將exchange和queue都設定durable=true,然後在訊息釋出的時候設定persistent=true即可。看一個栗子:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在裝置ip,這裡就是本機 HostName = "127.0.0.1", UserName = "wyy",//使用者名稱 Password = "123321"//密碼 }; //建立連線connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者準備就緒...."); string message = ""; //在控制檯輸入訊息,按enter鍵傳送訊息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); //設定訊息持久化 var props = channel.CreateBasicProperties(); props.Persistent = true; channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: props, body: body); //WaitForConfirms確認訊息(可以同時確認多條訊息)是否傳送成功,如果返回false表示傳送失敗,會自動重新發送 Console.WriteLine($"【{message}】傳送到Broke成功!"); } } } Console.ReadKey(); }
宣告exchange和queue時設定durable=true,然後執行上邊的程式碼,傳入一條訊息。重啟rabbitmq後,exchange,queue和訊息都會恢復。我們也可以在web管理介面設定訊息持久化,如下:
2 訊息優先順序(Priority)
我們知道queue是先進先出的,即先發送的訊息,先被消費。但是在具體業務中可能會遇到要提前處理某些訊息的需求,如一個常見的需求:普通客戶的訊息按先進先出的順序處理,Vip客戶的訊息要提前處理。訊息實現優先順序控制的實現方式是:首先在宣告queue是設定佇列的x-max-priority屬性,然後在publish訊息時,設定訊息的優先順序等級即可。為了演示方便,約定所有vip客戶的資訊都以vip開頭,看一下程式碼實現:
生產者程式碼:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在裝置ip,這裡就是本機 HostName = "127.0.0.1", UserName = "wyy",//使用者名稱 Password = "123321"//密碼 }; //建立連線connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { //宣告交換機exchang channel.ExchangeDeclare(exchange: "myexchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //宣告佇列queue channel.QueueDeclare(queue: "myqueue", durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>() { //佇列優先順序最高為10,不加x-max-priority的話,計算髮布時設定了訊息的優先順序也不會生效 {"x-max-priority",10 } }); //繫結exchange和queue channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "mykey"); Console.WriteLine("生產者準備就緒...."); //一些待發送的訊息 string[] msgs = { "vip1", "hello2", "world3","common4", "vip5" }; //設定訊息優先順序 var props = channel.CreateBasicProperties(); foreach (string msg in msgs) { //vip開頭的訊息,優先順序設定為9 if (msg.StartsWith("vip")) { props.Priority = 9; channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: props, body: Encoding.UTF8.GetBytes(msg)); } //其他訊息的優先順序為1 else { props.Priority = 1; channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: props, body: Encoding.UTF8.GetBytes(msg)); } } } } Console.ReadKey(); }
消費者,不需要對消費者做額外的配置,程式碼如下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在裝置ip,這裡就是本機 HostName = "127.0.0.1", UserName = "wyy",//使用者名稱 Password = "123321"//密碼 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { #region EventingBasicConsumer //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Console.WriteLine(Encoding.UTF8.GetString(ea.Body)); }; Console.WriteLine("消費者準備就緒...."); //處理訊息 channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer); Console.ReadKey(); #endregion } } }
執行程式,結果如下,我們看到vip開頭的訊息被率先處理了,證明優先順序是生效的
3 小結
本節簡單介紹了Rabbitmq中的訊息確認,訊息持久化,訊息優先順序的實現方式,這幾個功能在開發中會經常用到,RabbitMQ還有一些其他有用的功能,如Lazy queue模式,dead letter處理,queue的訊息條數、位元組數限制等,這裡沒有具體演示,有興趣的園友可以自己研究一下。
&n