1. 程式人生 > 其它 >MassTransit .NET Core 分散式事務

MassTransit .NET Core 分散式事務

使用 .NET 5 + MassTransit 8.0.1 實現一個分散式事務,並保證最終一致性

 

WebAPI

readonly ISendEndpointProvider _sendEndpointProvider;
public DemoController(ISendEndpointProvider sendEndpointProvider)
{
    _sendEndpointProvider = sendEndpointProvider;
}

[HttpGet]
public async Task<ActionResult> Get(string id)
{
    
string message = "Meeeeeeeee" + id; var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("queue:DeductStock_Queue")); await endpoint.Send<DeductStockDto>(new { Id = id, Message = message }); return Ok(); }

Startup

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllersWithViews();
    services.AddMassTransit(x 
=> { x.UsingRabbitMq((context, cfg) => { cfg.Host(new Uri("rabbitmq://192.168.214.129/MassTransit_vhost"), c => { c.Username("guest"); c.Password("guest"); }); }); }); services.Configure<MassTransitHostOptions>(options => { options.WaitUntilStarted
= true; options.StartTimeout = TimeSpan.FromSeconds(30); options.StopTimeout = TimeSpan.FromMinutes(1); }); }

減庫存·

static void Main(string[] args)
{
    Console.Title = "減庫存";

    var builder = new HostBuilder();
    builder.ConfigureServices((hostContext, services) =>
    {
        services.AddMassTransit(x =>
        {
            x.AddConsumer<DeductStockConsumer>();
            x.AddConsumer<DeductStockErrorConsumer>();
            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.Host(new Uri("rabbitmq://192.168.214.129/MassTransit_vhost"), c =>
                {
                    c.Username("guest");
                    c.Password("guest");
                });

                cfg.ReceiveEndpoint("DeductStock_Queue", e =>
                {
                    e.ConfigureConsumer<DeductStockConsumer>(context);
                });

                cfg.ReceiveEndpoint("DeductStock_Queue_error", e =>
                {
                    e.ConfigureConsumer<DeductStockErrorConsumer>(context);
                });
            });
        });

        services.Configure<MassTransitHostOptions>(options =>
        {
            options.WaitUntilStarted = true;
            options.StartTimeout = TimeSpan.FromSeconds(30);
            options.StopTimeout = TimeSpan.FromMinutes(1);
        });
    })
    .Build().Run();
}

DeductStockConsumer

public class DeductStockConsumer : IConsumer<DeductStockDto>
{
    public async Task Consume(ConsumeContext<DeductStockDto> context)
    {
        Console.WriteLine("扣庫存:" + context.Message.Id);

        if (context.Message.Id % 10 == 1 && !context.Message.IsBug)
        {
            throw new Exception("");
        }



        var endpoint = await context.GetSendEndpoint(new Uri("queue:DeductBalance_Queue"));
        await endpoint.Send<DeductBalanceDto>(new
        {
            Id = context.Message.Id,
            Message = context.Message.Message + ":DeductStockConsumer",
            IsBug = context.Message.IsBug,
            RetryCount = context.Message.RetryCount
        });

        //return Task.CompletedTask;
    }
}

public class DeductStockDto
{
    public int Id { get; set; }
    public int DeductStock { get; set; }
    public int DeductBalance { get; set; }
    public string Message { get; set; }
    public bool IsBug { get; set; }
    public int RetryCount { get; set; }
}

DeductStockErrorConsumer

public class DeductStockErrorConsumer : IConsumer<DeductStockDto>
{
    public async Task Consume(ConsumeContext<DeductStockDto> context)
    {
        Console.WriteLine("減庫存異常:" + context.Message.Id);
        if (context.Message.RetryCount >= 5)
        {
            throw new Exception();
        }
        context.Message.RetryCount += 1;
        context.Message.IsBug = true;
        var endpoint = await context.GetSendEndpoint(new Uri("queue:DeductStock_Queue"));
        await endpoint.Send<DeductStockDto>(new
        {
            Id = context.Message.Id,
            Message = context.Message.Message + ":DeductStockConsumer",
            IsBug = context.Message.IsBug,
            RetryCount = context.Message.RetryCount
        });
    }
}

 

異常重試

e.UseMessageRetry(r =>
{
    r.Immediate(20);
});

 

如果出現異常,MassTransit 會自動建立一個錯誤佇列

如果訊息未被消費,MassTransit 會自動建立一個死信佇列

MassTransit 對異常的訊息可以進行重新投遞