RabbitMQ (九) : 消息確認機制之 confirm 機制
阿新 • • 發佈:2019-02-07
sel 異步 confirm lin creat 生產 不能 消息 tip
confirm 機制分串行和並行兩種.
串行
生產者
public class Producer { private const string QueueName = "test_confirm_queue"; public static void Send() { IConnection connection = ConnectionHelper.GetConnection(); IModel channel = connection.CreateModel(); channel.QueueDeclare(QueueName,false, false, false, null); //開啟confirm機制.註意 : 一個隊列如果之前已經設置成了事務機制,那麽不能再設置為 confirm 機制.反之亦然 channel.ConfirmSelect(); string msg = "hello world "; //發送消息 for (int i = 0; i < 10; i++) { channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg + i)); } //設置confirm機制的串行機制,如果消息采用的是持久化,那麽確認消息會在持久化後發出.
//可以發一批後,調用該方法;也可以每發一條調用一次.
//當然是發一批消息後調用好些!
if (channel.WaitForConfirms()) { Console.WriteLine("send is success"); }else { Console.WriteLine("send is failed"); } channel.Close(); connection.Close(); } }
並行(異步)
生產者
public class Producer { private const string QueueName = "test_confirm_queue"; //這裏一定要用 SortedSet private static readonly SortedSet<ulong> ConfirmSort = new SortedSet<ulong>(); public static void Send() { using (IConnection connection = ConnectionHelper.GetConnection()) { using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, false, false, false, null); channel.ConfirmSelect(); //成功 channel.BasicAcks += (s, e) => { //多條 if (e.Multiple) { Console.WriteLine("最後成功的一條是 : " + e.DeliveryTag); ConfirmSort.RemoveWhere(r => r <= e.DeliveryTag); } //單條 else { Console.WriteLine(e.DeliveryTag + " 成功發送 "); ConfirmSort.Remove(e.DeliveryTag); } }; //失敗 channel.BasicNacks += (s, e) => { //多條 if (e.Multiple) { Console.WriteLine("最後失敗的一條是 : " + e.DeliveryTag); } //單條 else { Console.WriteLine(e.DeliveryTag + " 發送失敗 "); } }; //發送消息 string msg = "hello world "; int i = 0; while (true) { ulong seqNo = channel.NextPublishSeqNo; channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg + i)); ConfirmSort.Add(seqNo);//把編號加入到集合中 i++; } } } } }
RabbitMQ (九) : 消息確認機制之 confirm 機制