RabbitMQ .NET Core 分散式事務
使用 .NET 5 + RabbitMQ 實現一個分散式事務,並保證最終一致性
流程為:
減庫存 -> 減餘額 -> 建立訂單
RabbitMQ 中建立六個佇列:
減庫存佇列、減庫存死信佇列
減餘額佇列、減餘額死信佇列
建立訂單佇列、建立訂單死信佇列
一個 WebAPI 用來發起流程
四個控制檯,三個用來消費各自佇列中的訊息,一個對各種的錯誤進行協調
訊息的可靠性
生產者確認
// 開啟生產者確認 Channel.ConfirmSelect(); // 傳送訊息 Channel.BasicPublish(exchange, routingKey, props, body);// 生產者確認 bool isSendMsgOk = Channel.WaitForConfirms(); // 進行訊息重發 for (int i = 0; i < repeat && !isSendMsgOk; i++) { Channel.BasicPublish(exchange, routingKey, props, body); isSendMsgOk = Channel.WaitForConfirms(); }
消費者確認
client.Channel.BasicAck(ea.DeliveryTag, false);
傳送訊息的同時把訊息持久化到資料庫中,並記錄當前狀態,到下一個環節的時候修改該訊息的狀態。
消費者異常
這裡就涉及到,異常後訊息重新投遞的問題了。
如果 NACK 那麼這個訊息會回到佇列的最上面,然後消費者在進行消費,這時候就遇見一個問題,不知道這個訊息 Retry 了幾次,因此需要一箇中間介子記錄一下這個訊息 Retry 的次數。如果超過了一個閾值就XXX處理。
我這裡使用的是死信佇列進行處理的,消費者異常後直接 Nack ,並把 Request 設定成 false,該訊息就會進入到對應的死信佇列中。然後又一個排程者,訂閱死信佇列,把訊息重新投遞到佇列中,並控制重試次數,如果超過閾值就XXX處理。
訊息的順序
因為整個流程都是同步進行的所以不存在順序問題
重複消費
生產者保證一定能把訊息傳送出去就行了
消費者需要保證業務程式碼必須冪等。執行 SQL 的時候使用 if else 判斷一下資料是否已經存在,如果不存在就執行相關的 SQL。(減庫存、減餘額的時候 Where 庫存 >= 扣減庫存)
生產者
RabbitMQClient _mQClient; public OrderController(RabbitMQClient mQClient) { _mQClient = mQClient; } [HttpPost] public OrderDto CreateOrder(OrderDto dto) { _mQClient.Publish(dto, "DeductStock_Exchange", "", true); return dto; }
Startup
services.AddSingleton(typeof(RabbitMQClient));
RabbitMQClient
public class RabbitMQClient { private readonly IConfiguration _configuration; private bool IsEvent = true; private IModel _channel; public RabbitMQClient(IConfiguration configuration) { _configuration = configuration; ConnectionFactory factory = new ConnectionFactory { UserName = _configuration["RabbitmqConfig:Username"], Password = _configuration["RabbitmqConfig:Password"], HostName = _configuration["RabbitmqConfig:Host"], VirtualHost = _configuration["RabbitmqConfig:VirtualHost"], AutomaticRecoveryEnabled = true, //網路故障自動連線恢復 }; Connection = factory.CreateConnection(); } public IConnection Connection { get; } public IModel Channel { get { if (_channel == null || !_channel.IsOpen) { _channel = Connection.CreateModel(); } return _channel; } } public void Publish<T>(T message, string exchange, string routingKey, bool isConfirm = true, int repeat = 5) { if (isConfirm) { Channel.ConfirmSelect(); } var sendBytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); PublishMessage(sendBytes, exchange, routingKey, isConfirm); if (isConfirm) { bool isSendMsgOk = Channel.WaitForConfirms(); for (int i = 0; i < repeat && !isSendMsgOk; i++) { // 進行訊息重發 PublishMessage(sendBytes, exchange, routingKey, isConfirm); isSendMsgOk = Channel.WaitForConfirms(); } } } private void PublishMessage(ReadOnlyMemory<byte> body, string exchange, string routingKey, bool isConfirm = true) { IBasicProperties props = Channel.CreateBasicProperties(); props.MessageId = Guid.NewGuid().ToString(); Channel.BasicPublish(exchange, routingKey, props, body); } } }
消費者+生產者
訂閱減庫存的佇列,消費成功後向減餘額的佇列中傳送訊息
static void Main(string[] args) { Console.Title = "減庫存"; var build = new HostBuilder(); build.ConfigureServices((hostContext, services) => { var configuration = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("appsettings.json") .Build(); RabbitMQClient client = new RabbitMQClient(configuration); string exchangeName = "DeductStock_Exchange"; string queueName = "DeductStock_Queue"; string routingKey = "DeductStock_Routing"; string dead_ExchangeName = "DeductStock_Exchange_dead"; string dead_QueueName = "DeductStock_Queue_dead"; string dead_RoutingKey = "DeductStock_Routing_dead"; client.Channel.ExchangeDeclare(dead_ExchangeName, type: "fanout", durable: true, autoDelete: false); client.Channel.QueueDeclare(dead_QueueName, durable: true, exclusive: false, autoDelete: false); client.Channel.QueueBind(dead_QueueName, dead_ExchangeName, dead_RoutingKey); client.Channel.ExchangeDeclare(exchangeName, type: "fanout", durable: true, autoDelete: false); client.Channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-dead-letter-exchange", dead_ExchangeName }, { "x-dead-letter-routing-key", dead_RoutingKey }, { "x-message-ttl", 10000 } }); client.Channel.QueueBind(queueName, exchangeName, routingKey); //事件基本消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(client.Channel); //接收到訊息事件 consumer.Received += (ch, ea) => { try { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"收到訊息: { message }"); OrderDto dto = JsonConvert.DeserializeObject<OrderDto>(message); if (dto.Id % 10 == 1 && !dto.IsBug) { throw new Exception(); } //確認該訊息已被消費,確認完成後 RabbitMQ 會刪除該訊息 client.Channel.BasicAck(ea.DeliveryTag, false); client.Publish(dto, "DeductBalance_Exchange", "", true); } catch (Exception ex) { Console.WriteLine("庫存異常"); client.Channel.BasicNack(ea.DeliveryTag, false, false); } }; client.Channel.BasicConsume(queueName, false, consumer); Console.WriteLine("庫存消費者"); }).Build().Run(); } }
減餘額的和建立訂單的基本一樣
最後一個排程
訂閱死信佇列,然後把死信佇列中的訊息重新投遞到佇列中,讓佇列繼續處理相關的業務
如果達到閾值,或者出現什麼問題把訊息持久化到硬碟上面
{ var model = new DispatchModel("", "DeductStock_Exchange", "DeductStock_Queue_dead", Directory.GetCurrentDirectory() + "\\DeductStock.txt"); var channel = Connection.CreateModel(); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"收到訊息: { message }"); OrderDto dto = JsonConvert.DeserializeObject<OrderDto>(message); try { channel.BasicAck(ea.DeliveryTag, false); if (dto.RetryCount >= 5) { File.WriteAllText(model.messageFilePath, message); return; } dto.RetryCount += 1; // 開啟發送確認 channel.ConfirmSelect(); var sendBytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(dto)); channel.BasicPublish(model.exchangeName, model.routingKey, null, sendBytes); bool isSendMsgOk = channel.WaitForConfirms(); for (int i = 0; i < 5 && !isSendMsgOk; i++) { // 進行訊息重發 channel.BasicPublish(model.exchangeName, model.routingKey, null, sendBytes); isSendMsgOk = channel.WaitForConfirms(); } if (!isSendMsgOk) { /// 傳送六次都沒有成功 /// 訊息快取到本地 File.WriteAllText(model.messageFilePath, message); } } catch (Exception ex) { // 確認消費 channel.BasicAck(ea.DeliveryTag, false); // 訊息快取到本地 File.WriteAllText(model.messageFilePath, message); } }; //啟動消費者 設定為手動應答訊息 channel.BasicConsume(model.dead_queueName, false, consumer); }