1. 程式人生 > 實用技巧 >RabbitMQ使用交換機處理非同步訊息佇列------分散式事務處理案例

RabbitMQ使用交換機處理非同步訊息佇列------分散式事務處理案例

RabbitMQ使用交換機處理非同步訊息佇列案例的安裝環境可以參考RabbitMQ環境準備/環境搭建

本片在RabbitMQ環境已有的基礎上講述RabbitMQ使用交換機處理非同步訊息佇列------分散式事務處理案例具體過程

1、新建.NET Core console控制檯專案ConsoleRabbitMQ專案(生產者 productor,即產生訊息的)以及ConsoleRabbitMQ01專案(consumer消費者,即使用訊息的)

2、對控制檯專案使用NuGet程式管理包新增RabbitMQ.Client

3、控制檯專案ConsoleRabbitMQ專案的Program程式碼如下

using RabbitMQ.Client;
using System;

namespace ConsoleRabbitMQ
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("RabbitMQ 生產者開始。。。生產。。。!");

            #region RabbitMQ 生產者
            var connectionFactory = new ConnectionFactory()
            {
                HostName 
= "localhost", UserName = "guest", Password = "guest" }; using (var connection = connectionFactory.CreateConnection()) { using var channel = connection.CreateModel(); // durable: true 佇列持久化 channel.QueueDeclare(queue: "
myqueue", durable: true, false, false, null); //durable: true 交換機持久化 channel.ExchangeDeclare(exchange: "myexchange", ExchangeType.Direct, durable: true, false, null); //持久化訊息,告訴訊息佇列,該條訊息需要持久化和固化到磁碟 var propertyPersist = channel.CreateBasicProperties(); propertyPersist.Persistent = true; channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "myexchangekey", null); #region Tx事務處理,不推薦使用,處理過程較複雜 //channel.TxSelect();//開起事務 1 //for (int i = 0; i < 100; i++) //{ // var body = System.Text.Encoding.UTF8.GetBytes($"這是釋出的資料。{i}。"); // //持久化訊息 basicProperties: propertyPersist // channel.BasicPublish(exchange: "myexchange", routingKey: "myexchangekey", basicProperties: propertyPersist, body);//傳送訊息給訊息佇列,之後訊息佇列收到以後會進行初持久化處理,儲存路徑C:\Users\Administrator\AppData下面的RabbitMQ,query檔案中 // System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); //} //try //{ // channel.TxCommit();//提交事務 1 //} //catch (Exception ex) //{ // //這個說明生產者傳送訊息到訊息佇列時出錯了,這裡可以記錄錯誤,也可以重試再次傳送等等處理 // Console.WriteLine($"RabbitMQ 生產者傳送訊息到訊息佇列時出錯了,錯誤資訊:{ex.Message}"); // channel.TxRollback();//回滾事務 1 //} #endregion #region Tx事務處理,推薦使用 try { channel.ConfirmSelect();//開起訊息確認模式 2 這個rabbitmq的擴充套件,可以看成一個回撥 for (int i = 0; i < 100; i++) { var body = System.Text.Encoding.UTF8.GetBytes($"這是釋出的資料。{i}。"); //持久化訊息 basicProperties: propertyPersist channel.BasicPublish(exchange: "myexchange", routingKey: "myexchangekey", basicProperties: propertyPersist, body);//傳送訊息給訊息佇列,之後訊息佇列收到以後會進行初持久化處理,儲存路徑C:\Users\Administrator\AppData下面的RabbitMQ,query檔案中 System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); } //使用下面2中確認方式 //第一種 if (channel.WaitForConfirms())//返回true 表示訊息傳送到訊息佇列,否則傳送失敗 { Console.WriteLine("RabbitMQ 生產者傳送訊息到訊息隊成功"); } //第二種 //channel.WaitForConfirmsOrDie();//確認訊息傳送到訊息佇列,傳送成功則繼續執行,否則即沒發成功的話就會報錯,丟擲異常,在catch中捕獲處理 } catch (Exception ex) { //這個說明生產者傳送訊息到訊息佇列時出錯了,這裡可以記錄錯誤,也可以重試再次傳送等等處理 Console.WriteLine($"RabbitMQ 生產者傳送訊息到訊息佇列時出錯了,錯誤資訊:{ex.Message}"); channel.TxRollback();//回滾事務 2 } #endregion } #endregion Console.WriteLine("RabbitMQ 輸入任何字元退出。。"); Console.Read(); } } }

4、控制檯專案ConsoleRabbitMQ01專案的Program程式碼如下

using RabbitMQ.Client;
using System;
using RabbitMQ.Client.Events;

namespace ConsoleRabbitMQ01
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("RabbitMQ 消費者開始。。。消費。。。!");

            #region RabbitMQ 消費者
            var connectionFactory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "guest",
                Password = "guest"
            };
            using (var connection = connectionFactory.CreateConnection())
            {
                using var channel = connection.CreateModel();
                // durable: true 佇列持久化
                channel.QueueDeclare(queue: "myqueue", durable: true, false, false, null);
                //durable: true 交換機持久化
                channel.ExchangeDeclare(exchange: "myexchange", ExchangeType.Direct, durable: true, false, null);
                channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "myexchangekey", null);
                var consumer = new EventingBasicConsumer(channel);//消費事件
                consumer.Received += (sender, e) =>
                {

                    //下面操作包括事務處理
                    var body = System.Text.Encoding.UTF8.GetString(e.Body.ToArray());
                    //
                    //處理訊息具體處理過程
                    Console.WriteLine("RabbitMQ 消費者已經消費訊息");
                    //

                    ////手動確認,正常消費,通知訊息中心,該條訊息可以刪除了,手動確認的話,自動確認要設定為false
                    //channel.BasicAck(e.DeliveryTag, false);
                    //channel.BasicConsume(queue: "myqueue", autoAck: false, consumer);

                    ////手動確認,非正常消費即出錯出現異常,通知訊息中心,手動確認的話,自動確認要設定為false
                    //BasicReject 中requeue: true 告訴訊息佇列,出錯,但是重新把訊息插入到佇列中,下次使用
                    //BasicReject 中requeue: false 告訴訊息佇列,出錯,刪除該條訊息
                    channel.BasicReject(e.DeliveryTag,requeue: true);
                    //channel.BasicConsume(queue: "myqueue", autoAck: false, consumer);

                    //自動確認,表示已成功從訊息佇列中讀取訊息,通知訊息佇列
                    channel.BasicConsume(queue: "myqueue", autoAck: true, consumer);
                };
            }
            #endregion
            Console.WriteLine("RabbitMQ 輸入任何字元退出。。");
            Console.Read();
        }
    }
}

5、啟動RabbitMQ服務,輸入命令:rabbitmq-service start

6、分別啟動ConsoleRabbitMQ.exe和ConsoleRabbitMQ01.exe這個兩個專案

瀏覽器中輸入:http://localhost:15672/#/,點選Connections,Channels,Queues可以檢視相關資訊