RabbitMQ使用交換機處理非同步訊息佇列------分散式事務處理案例
阿新 • • 發佈:2020-08-01
RabbitMQ使用交換機處理非同步訊息佇列案例的安裝環境可以參考RabbitMQ環境準備/環境搭建,
本片在RabbitMQ環境已有的基礎上講述RabbitMQ使用交換機處理非同步訊息佇列------分散式事務處理案例具體過程
1、新建.NET Core console控制檯專案ConsoleRabbitMQ專案(生產者 productor,即產生訊息的)以及ConsoleRabbitMQ01專案(consumer消費者,即使用訊息的)
2、對控制檯專案使用NuGet程式管理包新增RabbitMQ.Client
3、控制檯專案ConsoleRabbitMQ專案的Program程式碼如下
using RabbitMQ.Client; using System; namespace ConsoleRabbitMQ { class Program { static void Main(string[] args) { Console.WriteLine("RabbitMQ 生產者開始。。。生產。。。!"); #region RabbitMQ 生產者 var connectionFactory = new ConnectionFactory() { HostName= "localhost", UserName = "guest", Password = "guest" }; using (var connection = connectionFactory.CreateConnection()) { using var channel = connection.CreateModel(); // durable: true 佇列持久化 channel.QueueDeclare(queue: "myqueue", durable: true, false, false, null); //durable: true 交換機持久化 channel.ExchangeDeclare(exchange: "myexchange", ExchangeType.Direct, durable: true, false, null); //持久化訊息,告訴訊息佇列,該條訊息需要持久化和固化到磁碟 var propertyPersist = channel.CreateBasicProperties(); propertyPersist.Persistent = true; channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "myexchangekey", null); #region Tx事務處理,不推薦使用,處理過程較複雜 //channel.TxSelect();//開起事務 1 //for (int i = 0; i < 100; i++) //{ // var body = System.Text.Encoding.UTF8.GetBytes($"這是釋出的資料。{i}。"); // //持久化訊息 basicProperties: propertyPersist // channel.BasicPublish(exchange: "myexchange", routingKey: "myexchangekey", basicProperties: propertyPersist, body);//傳送訊息給訊息佇列,之後訊息佇列收到以後會進行初持久化處理,儲存路徑C:\Users\Administrator\AppData下面的RabbitMQ,query檔案中 // System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); //} //try //{ // channel.TxCommit();//提交事務 1 //} //catch (Exception ex) //{ // //這個說明生產者傳送訊息到訊息佇列時出錯了,這裡可以記錄錯誤,也可以重試再次傳送等等處理 // Console.WriteLine($"RabbitMQ 生產者傳送訊息到訊息佇列時出錯了,錯誤資訊:{ex.Message}"); // channel.TxRollback();//回滾事務 1 //} #endregion #region Tx事務處理,推薦使用 try { channel.ConfirmSelect();//開起訊息確認模式 2 這個rabbitmq的擴充套件,可以看成一個回撥 for (int i = 0; i < 100; i++) { var body = System.Text.Encoding.UTF8.GetBytes($"這是釋出的資料。{i}。"); //持久化訊息 basicProperties: propertyPersist channel.BasicPublish(exchange: "myexchange", routingKey: "myexchangekey", basicProperties: propertyPersist, body);//傳送訊息給訊息佇列,之後訊息佇列收到以後會進行初持久化處理,儲存路徑C:\Users\Administrator\AppData下面的RabbitMQ,query檔案中 System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); } //使用下面2中確認方式 //第一種 if (channel.WaitForConfirms())//返回true 表示訊息傳送到訊息佇列,否則傳送失敗 { Console.WriteLine("RabbitMQ 生產者傳送訊息到訊息隊成功"); } //第二種 //channel.WaitForConfirmsOrDie();//確認訊息傳送到訊息佇列,傳送成功則繼續執行,否則即沒發成功的話就會報錯,丟擲異常,在catch中捕獲處理 } catch (Exception ex) { //這個說明生產者傳送訊息到訊息佇列時出錯了,這裡可以記錄錯誤,也可以重試再次傳送等等處理 Console.WriteLine($"RabbitMQ 生產者傳送訊息到訊息佇列時出錯了,錯誤資訊:{ex.Message}"); channel.TxRollback();//回滾事務 2 } #endregion } #endregion Console.WriteLine("RabbitMQ 輸入任何字元退出。。"); Console.Read(); } } }
4、控制檯專案ConsoleRabbitMQ01專案的Program程式碼如下
using RabbitMQ.Client; using System; using RabbitMQ.Client.Events; namespace ConsoleRabbitMQ01 { class Program { static void Main(string[] args) { Console.WriteLine("RabbitMQ 消費者開始。。。消費。。。!"); #region RabbitMQ 消費者 var connectionFactory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest" }; using (var connection = connectionFactory.CreateConnection()) { using var channel = connection.CreateModel(); // durable: true 佇列持久化 channel.QueueDeclare(queue: "myqueue", durable: true, false, false, null); //durable: true 交換機持久化 channel.ExchangeDeclare(exchange: "myexchange", ExchangeType.Direct, durable: true, false, null); channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "myexchangekey", null); var consumer = new EventingBasicConsumer(channel);//消費事件 consumer.Received += (sender, e) => { //下面操作包括事務處理 var body = System.Text.Encoding.UTF8.GetString(e.Body.ToArray()); // //處理訊息具體處理過程 Console.WriteLine("RabbitMQ 消費者已經消費訊息"); // ////手動確認,正常消費,通知訊息中心,該條訊息可以刪除了,手動確認的話,自動確認要設定為false //channel.BasicAck(e.DeliveryTag, false); //channel.BasicConsume(queue: "myqueue", autoAck: false, consumer); ////手動確認,非正常消費即出錯出現異常,通知訊息中心,手動確認的話,自動確認要設定為false //BasicReject 中requeue: true 告訴訊息佇列,出錯,但是重新把訊息插入到佇列中,下次使用 //BasicReject 中requeue: false 告訴訊息佇列,出錯,刪除該條訊息 channel.BasicReject(e.DeliveryTag,requeue: true); //channel.BasicConsume(queue: "myqueue", autoAck: false, consumer); //自動確認,表示已成功從訊息佇列中讀取訊息,通知訊息佇列 channel.BasicConsume(queue: "myqueue", autoAck: true, consumer); }; } #endregion Console.WriteLine("RabbitMQ 輸入任何字元退出。。"); Console.Read(); } } }
5、啟動RabbitMQ服務,輸入命令:rabbitmq-service start
6、分別啟動ConsoleRabbitMQ.exe和ConsoleRabbitMQ01.exe這個兩個專案
瀏覽器中輸入:http://localhost:15672/#/,點選Connections,Channels,Queues可以檢視相關資訊