1. 程式人生 > 其它 >RabbitMQ .NET Core 分散式事務

RabbitMQ .NET Core 分散式事務

使用 .NET 5 + RabbitMQ 實現一個分散式事務,並保證最終一致性

流程為:

  減庫存 -> 減餘額 -> 建立訂單

RabbitMQ 中建立六個佇列:

  減庫存佇列、減庫存死信佇列

  減餘額佇列、減餘額死信佇列

  建立訂單佇列、建立訂單死信佇列

一個 WebAPI 用來發起流程

四個控制檯,三個用來消費各自佇列中的訊息,一個對各種的錯誤進行協調

 

訊息的可靠性

  生產者確認

// 開啟生產者確認
Channel.ConfirmSelect();
// 傳送訊息
Channel.BasicPublish(exchange, routingKey, props, body);
// 生產者確認 bool isSendMsgOk = Channel.WaitForConfirms(); // 進行訊息重發 for (int i = 0; i < repeat && !isSendMsgOk; i++) { Channel.BasicPublish(exchange, routingKey, props, body); isSendMsgOk = Channel.WaitForConfirms(); }

  消費者確認

client.Channel.BasicAck(ea.DeliveryTag, false);

  傳送訊息的同時把訊息持久化到資料庫中,並記錄當前狀態,到下一個環節的時候修改該訊息的狀態。

消費者異常

  這裡就涉及到,異常後訊息重新投遞的問題了。

  如果 NACK 那麼這個訊息會回到佇列的最上面,然後消費者在進行消費,這時候就遇見一個問題,不知道這個訊息 Retry 了幾次,因此需要一箇中間介子記錄一下這個訊息 Retry 的次數。如果超過了一個閾值就XXX處理。

  我這裡使用的是死信佇列進行處理的,消費者異常後直接 Nack ,並把 Request 設定成 false,該訊息就會進入到對應的死信佇列中。然後又一個排程者,訂閱死信佇列,把訊息重新投遞到佇列中,並控制重試次數,如果超過閾值就XXX處理。

 

訊息的順序

  因為整個流程都是同步進行的所以不存在順序問題

 

重複消費

  生產者保證一定能把訊息傳送出去就行了

  消費者需要保證業務程式碼必須冪等。執行 SQL 的時候使用 if else 判斷一下資料是否已經存在,如果不存在就執行相關的 SQL。(減庫存、減餘額的時候 Where 庫存 >= 扣減庫存)

 

 

 

生產者

RabbitMQClient _mQClient;
public OrderController(RabbitMQClient mQClient)
{
    _mQClient = mQClient;
}

[HttpPost]
public OrderDto CreateOrder(OrderDto dto)
{
    _mQClient.Publish(dto, "DeductStock_Exchange", "", true);
    return dto;
}

Startup

services.AddSingleton(typeof(RabbitMQClient));

RabbitMQClient

   public class RabbitMQClient
    {
        private readonly IConfiguration _configuration;
        private bool IsEvent = true;
        private IModel _channel;
        public RabbitMQClient(IConfiguration configuration)
        {
            _configuration = configuration;
            ConnectionFactory factory = new ConnectionFactory
            {
                UserName = _configuration["RabbitmqConfig:Username"],
                Password = _configuration["RabbitmqConfig:Password"],
                HostName = _configuration["RabbitmqConfig:Host"],
                VirtualHost = _configuration["RabbitmqConfig:VirtualHost"],
                AutomaticRecoveryEnabled = true, //網路故障自動連線恢復
            };
            Connection = factory.CreateConnection();
        }
        public IConnection Connection { get; }
        public IModel Channel
        {
            get
            {
                if (_channel == null || !_channel.IsOpen)
                {
                    _channel = Connection.CreateModel();
                }
                return _channel;
            }
        }
        public void Publish<T>(T message, string exchange, string routingKey, bool isConfirm = true, int repeat = 5)
        {
            if (isConfirm)
            {
                Channel.ConfirmSelect();
            }
            var sendBytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
            PublishMessage(sendBytes, exchange, routingKey, isConfirm);
            if (isConfirm)
            {
                bool isSendMsgOk = Channel.WaitForConfirms();
                for (int i = 0; i < repeat && !isSendMsgOk; i++)
                {
                    // 進行訊息重發
                    PublishMessage(sendBytes, exchange, routingKey, isConfirm);
                    isSendMsgOk = Channel.WaitForConfirms();
                }
            }
        }
        private void PublishMessage(ReadOnlyMemory<byte> body, string exchange, string routingKey, bool isConfirm = true)
        {
            IBasicProperties props = Channel.CreateBasicProperties();
            props.MessageId = Guid.NewGuid().ToString();
            Channel.BasicPublish(exchange, routingKey, props, body);
        }
    }
}

消費者+生產者

訂閱減庫存的佇列,消費成功後向減餘額的佇列中傳送訊息

    static void Main(string[] args)
    {
        Console.Title = "減庫存";
        var build = new HostBuilder();
        build.ConfigureServices((hostContext, services) =>
        {
            var configuration = new ConfigurationBuilder()
                .SetBasePath(Directory.GetCurrentDirectory())
                .AddJsonFile("appsettings.json")
                .Build();
            RabbitMQClient client = new RabbitMQClient(configuration);

            string exchangeName = "DeductStock_Exchange";
            string queueName = "DeductStock_Queue";
            string routingKey = "DeductStock_Routing";

            string dead_ExchangeName = "DeductStock_Exchange_dead";
            string dead_QueueName = "DeductStock_Queue_dead";
            string dead_RoutingKey = "DeductStock_Routing_dead";

            client.Channel.ExchangeDeclare(dead_ExchangeName, type: "fanout", durable: true, autoDelete: false);
            client.Channel.QueueDeclare(dead_QueueName, durable: true, exclusive: false, autoDelete: false);
            client.Channel.QueueBind(dead_QueueName, dead_ExchangeName, dead_RoutingKey);

            client.Channel.ExchangeDeclare(exchangeName, type: "fanout", durable: true, autoDelete: false);
            client.Channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> {
                { "x-dead-letter-exchange", dead_ExchangeName },
                { "x-dead-letter-routing-key", dead_RoutingKey },
                { "x-message-ttl", 10000 }
            });
            client.Channel.QueueBind(queueName, exchangeName, routingKey);

            //事件基本消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(client.Channel);

            //接收到訊息事件
            consumer.Received += (ch, ea) =>
            {
                try
                {
                    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                    Console.WriteLine($"收到訊息: { message }");
                    OrderDto dto = JsonConvert.DeserializeObject<OrderDto>(message);

                    if (dto.Id % 10 == 1 && !dto.IsBug)
                    {
                        throw new Exception();
                    }

                    //確認該訊息已被消費,確認完成後 RabbitMQ 會刪除該訊息
                    client.Channel.BasicAck(ea.DeliveryTag, false);

                    client.Publish(dto, "DeductBalance_Exchange", "", true);
                }
                catch (Exception ex)
                {
                    Console.WriteLine("庫存異常");
                    client.Channel.BasicNack(ea.DeliveryTag, false, false);
                }
            };
            client.Channel.BasicConsume(queueName, false, consumer);
            Console.WriteLine("庫存消費者");
        }).Build().Run();
    }
}

減餘額的和建立訂單的基本一樣

最後一個排程

訂閱死信佇列,然後把死信佇列中的訊息重新投遞到佇列中,讓佇列繼續處理相關的業務

如果達到閾值,或者出現什麼問題把訊息持久化到硬碟上面

{
    var model = new DispatchModel("", "DeductStock_Exchange", "DeductStock_Queue_dead", Directory.GetCurrentDirectory() + "\\DeductStock.txt");
    var channel = Connection.CreateModel();
    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    consumer.Received += (ch, ea) =>
    {
        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
        Console.WriteLine($"收到訊息: { message }");
        OrderDto dto = JsonConvert.DeserializeObject<OrderDto>(message);
        try
        {
            channel.BasicAck(ea.DeliveryTag, false);
            if (dto.RetryCount >= 5)
            {
                File.WriteAllText(model.messageFilePath, message);
                return;
            }
            dto.RetryCount += 1;

            // 開啟發送確認
            channel.ConfirmSelect();
            var sendBytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(dto));
            channel.BasicPublish(model.exchangeName, model.routingKey, null, sendBytes);
            bool isSendMsgOk = channel.WaitForConfirms();

            for (int i = 0; i < 5 && !isSendMsgOk; i++)
            {
                // 進行訊息重發
                channel.BasicPublish(model.exchangeName, model.routingKey, null, sendBytes);
                isSendMsgOk = channel.WaitForConfirms();
            }
            if (!isSendMsgOk)
            {
                /// 傳送六次都沒有成功
                /// 訊息快取到本地
                File.WriteAllText(model.messageFilePath, message);
            }
        }
        catch (Exception ex)
        {
            // 確認消費
            channel.BasicAck(ea.DeliveryTag, false);
            // 訊息快取到本地
            File.WriteAllText(model.messageFilePath, message);
        }
    };
    //啟動消費者 設定為手動應答訊息
    channel.BasicConsume(model.dead_queueName, false, consumer);
}