MassTransit .NET Core 分散式事務
阿新 • • 發佈:2022-04-05
使用 .NET 5 + MassTransit 8.0.1 實現一個分散式事務,並保證最終一致性
WebAPI
readonly ISendEndpointProvider _sendEndpointProvider; public DemoController(ISendEndpointProvider sendEndpointProvider) { _sendEndpointProvider = sendEndpointProvider; } [HttpGet] public async Task<ActionResult> Get(string id) {string message = "Meeeeeeeee" + id; var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("queue:DeductStock_Queue")); await endpoint.Send<DeductStockDto>(new { Id = id, Message = message }); return Ok(); }
Startup
public void ConfigureServices(IServiceCollection services) { services.AddControllersWithViews(); services.AddMassTransit(x=> { x.UsingRabbitMq((context, cfg) => { cfg.Host(new Uri("rabbitmq://192.168.214.129/MassTransit_vhost"), c => { c.Username("guest"); c.Password("guest"); }); }); }); services.Configure<MassTransitHostOptions>(options => { options.WaitUntilStarted= true; options.StartTimeout = TimeSpan.FromSeconds(30); options.StopTimeout = TimeSpan.FromMinutes(1); }); }
減庫存·
static void Main(string[] args) { Console.Title = "減庫存"; var builder = new HostBuilder(); builder.ConfigureServices((hostContext, services) => { services.AddMassTransit(x => { x.AddConsumer<DeductStockConsumer>(); x.AddConsumer<DeductStockErrorConsumer>(); x.UsingRabbitMq((context, cfg) => { cfg.Host(new Uri("rabbitmq://192.168.214.129/MassTransit_vhost"), c => { c.Username("guest"); c.Password("guest"); }); cfg.ReceiveEndpoint("DeductStock_Queue", e => { e.ConfigureConsumer<DeductStockConsumer>(context); }); cfg.ReceiveEndpoint("DeductStock_Queue_error", e => { e.ConfigureConsumer<DeductStockErrorConsumer>(context); }); }); }); services.Configure<MassTransitHostOptions>(options => { options.WaitUntilStarted = true; options.StartTimeout = TimeSpan.FromSeconds(30); options.StopTimeout = TimeSpan.FromMinutes(1); }); }) .Build().Run(); }
DeductStockConsumer
public class DeductStockConsumer : IConsumer<DeductStockDto> { public async Task Consume(ConsumeContext<DeductStockDto> context) { Console.WriteLine("扣庫存:" + context.Message.Id); if (context.Message.Id % 10 == 1 && !context.Message.IsBug) { throw new Exception(""); } var endpoint = await context.GetSendEndpoint(new Uri("queue:DeductBalance_Queue")); await endpoint.Send<DeductBalanceDto>(new { Id = context.Message.Id, Message = context.Message.Message + ":DeductStockConsumer", IsBug = context.Message.IsBug, RetryCount = context.Message.RetryCount }); //return Task.CompletedTask; } } public class DeductStockDto { public int Id { get; set; } public int DeductStock { get; set; } public int DeductBalance { get; set; } public string Message { get; set; } public bool IsBug { get; set; } public int RetryCount { get; set; } }
DeductStockErrorConsumer
public class DeductStockErrorConsumer : IConsumer<DeductStockDto> { public async Task Consume(ConsumeContext<DeductStockDto> context) { Console.WriteLine("減庫存異常:" + context.Message.Id); if (context.Message.RetryCount >= 5) { throw new Exception(); } context.Message.RetryCount += 1; context.Message.IsBug = true; var endpoint = await context.GetSendEndpoint(new Uri("queue:DeductStock_Queue")); await endpoint.Send<DeductStockDto>(new { Id = context.Message.Id, Message = context.Message.Message + ":DeductStockConsumer", IsBug = context.Message.IsBug, RetryCount = context.Message.RetryCount }); } }
異常重試
e.UseMessageRetry(r => { r.Immediate(20); });
如果出現異常,MassTransit 會自動建立一個錯誤佇列
如果訊息未被消費,MassTransit 會自動建立一個死信佇列
MassTransit 對異常的訊息可以進行重新投遞