1. 程式人生 > 實用技巧 >使用 Masstransit中的 Request/Response 與 Courier 功能實現最終一致性

使用 Masstransit中的 Request/Response 與 Courier 功能實現最終一致性

簡介

目前的.net 生態中,最終一致性元件的選擇一直是一個問題。本地事務表(cap需要在每個服務的資料庫中插入訊息表,而且做不了此類事務 比如:建立訂單需要 餘額滿足+庫存滿足,庫存和餘額處於兩個服務中。masstransit 是我目前主要用的方案。以往一般都用 masstransit 中的 sagas 來實現 最終一致性,但是隨著併發的增加必定會對sagas 持久化的資料庫造成很大的壓力,根據stackoverflow 中的一個回答我發現了 一個用Request/Response 與 Courier 功能 實現最終一致性的方案Demo地址

Masstransit 中 Resquest/Response 功能

訊息DTO

    public class SampleMessageCommand
{
}

消費者

    public class SampleMessageCommandHandler : IConsumer<SampleMessageCommand>
{
public async Task Consume(ConsumeContext<SampleMessageCommand> context)
{
await context.RespondAsync(new SampleMessageCommandResult() { Data = "Sample" });
}
}

返回結果DTO

    public class SampleMessageCommandResult
{
public string Data { get; set; }
}

呼叫方式與註冊方式略過,詳情請看官方檔案

  

本質上使用訊息佇列實現Resquest/Response,客戶端(生產者)將請求訊息傳送至指定訊息佇列並賦予RequestId和ResponseAddress(臨時佇列 rabbitmq),服務端(消費者)消費訊息並把 需要返回的訊息放入指定ResponseAddress,客戶端收到 Response message 通過匹配 RequestId 找到 指定Request,最後返回資訊。

Masstransit 中Courier 功能

通過有序組合一系列的Activity,得到一個routing slip。每個 activity(忽略Execute Activities) 都有 Execute 和 Compensate 兩個方法。Compensate 用來執撤銷Execute 方法產生的影響(就是回退Execute 方法)。每個 ActivityExecute 最後都會 呼叫Completed 方法把 回退所需要的的資訊記錄在message中,最後持久化到訊息佇列的某一個訊息中。

餘額扣減的Activity ,這裡的DeductBalanceModel 是請求扣減的資料模型,DeductBalanceLog 是回退時需要用到的資訊。

public class DeductBalanceActivity : IActivity<DeductBalanceModel, DeductBalanceLog>
{
private readonly ILogger<DeductBalanceActivity> logger;
public DeductBalanceActivity(ILogger<DeductBalanceActivity> logger)
{
this.logger = logger;
}
public async Task<CompensationResult> Compensate(CompensateContext<DeductBalanceLog> context)
{
logger.LogInformation("還原餘額");
var log = context.Log; //可以獲取 所有execute 完成時儲存的資訊
//throw new ArgumentException("some things were wrong");
return context.Compensated();
} public async Task<ExecutionResult> Execute(ExecuteContext<DeductBalanceModel> context)
{ logger.LogInformation("扣減餘額");
await Task.Delay(100);
return context.Completed(new DeductBalanceLog() { Price = 100 });
}
}

扣減庫存 Activity

    public class DeductStockActivity : IActivity<DeductStockModel, DeductStockLog>
{
private readonly ILogger<DeductStockActivity> logger;
public DeductStockActivity(ILogger<DeductStockActivity> logger)
{
this.logger = logger;
}
public async Task<CompensationResult> Compensate(CompensateContext<DeductStockLog> context)
{
var log = context.Log;
logger.LogInformation("還原庫存");
return context.Compensated();
} public async Task<ExecutionResult> Execute(ExecuteContext<DeductStockModel> context)
{
var argument = context.Arguments;
logger.LogInformation("扣減庫存");
await Task.Delay(100);
return context.Completed(new DeductStockLog() { ProductId = argument.ProductId, Amount = 1 });
}
}

生成訂單 Execute Activity

    public class CreateOrderActivity : IExecuteActivity<CreateOrderModel>
{
private readonly ILogger<CreateOrderActivity> logger;
public CreateOrderActivity(ILogger<CreateOrderActivity> logger)
{
this.logger = logger;
}
public async Task<ExecutionResult> Execute(ExecuteContext<CreateOrderModel> context)
{
logger.LogInformation("建立訂單");
await Task.Delay(100);
//throw new CommonActivityExecuteFaildException("當日訂單已達到上限");
return context.CompletedWithVariables(new CreateOrderResult { OrderId="111122",Message="建立訂單成功" });
}
}

  組裝 以上 Activity 生成一個Routing Slip,這是一個有序的組合,扣減庫存=》扣減餘額=》生成訂單

            var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity("DeductStock", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/DeductStock_execute"), new DeductStockModel { ProductId = request.Message.ProductId }); builder.AddActivity("DeductBalance", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/DeductBalance_execute"), new DeductBalanceModel { CustomerId = request.Message.CustomerId, Price = request.Message.Price }); builder.AddActivity("CreateOrder", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/CreateOrder_execute"), new CreateOrderModel { Price = request.Message.Price, CustomerId = request.Message.CustomerId, ProductId = request.Message.ProductId });
var routingSlip = builder.Build();

  執行 Routing Slip

await bus.Execute(routingSlip);

 

這裡是沒有任何返回值的,所有activity都是 非同步執行,雖然所有的activity可以執行完成或者由於某個Activity執行出錯而全部回退。(其實這裡有一種更壞的情況就是Compensate 出錯,預設情況下 Masstransit 只會傳送一個回退錯誤的訊息,後面講到建立訂單的時候我會把它塞到錯誤佇列裡,這樣我們可以通過修改 Compensatebug後重新匯入到正常佇列來修正資料),這個功能完全滿足不了 建立訂單這個需求,執行 await bus.Execute(routingSlip) 後我們完全不知道訂單到底建立成功,還是由於庫存或餘額不足而失敗了(非同步)。

還好routing slip 在執行過程中產生很多訊息,比如RoutingSlipCompleted ,RoutingSlipCompensationFailed ,RoutingSlipActivityCompleted,RoutingSlipActivityFaulted 等,具體檔案,我們可以訂閱這些事件,再結合Request/Response 實現 建立訂單的功能。

實現建立訂單(庫存滿足+餘額滿足)長流程

建立訂單 command

    /// <summary>
/// 長流程 分散式事務
/// </summary>
public class CreateOrderCommand
{
public string ProductId { get; set; }
public string CustomerId { get; set; }
public int Price { get; set; }
}

事務第一步,扣減庫存相關 程式碼

  public class DeductStockActivity : IActivity<DeductStockModel, DeductStockLog>
{
private readonly ILogger<DeductStockActivity> logger;
public DeductStockActivity(ILogger<DeductStockActivity> logger)
{
this.logger = logger;
}
public async Task<CompensationResult> Compensate(CompensateContext<DeductStockLog> context)
{
var log = context.Log;
logger.LogInformation("還原庫存");
return context.Compensated();
} public async Task<ExecutionResult> Execute(ExecuteContext<DeductStockModel> context)
{
var argument = context.Arguments;
logger.LogInformation("扣減庫存");
await Task.Delay();
return context.Completed(new DeductStockLog() { ProductId = argument.ProductId, Amount = });
}
}
public class DeductStockModel
{
public string ProductId { get; set; }
}
public class DeductStockLog
{
public string ProductId { get; set; }
public int Amount { get; set; }
}

事務第二步,扣減餘額相關程式碼

public class DeductBalanceActivity : IActivity<DeductBalanceModel, DeductBalanceLog>
{
private readonly ILogger<DeductBalanceActivity> logger;
public DeductBalanceActivity(ILogger<DeductBalanceActivity> logger)
{
this.logger = logger;
}
public async Task<CompensationResult> Compensate(CompensateContext<DeductBalanceLog> context)
{
logger.LogInformation("還原餘額");
var log = context.Log;
//throw new ArgumentException("some things were wrong");
return context.Compensated();
} public async Task<ExecutionResult> Execute(ExecuteContext<DeductBalanceModel> context)
{ logger.LogInformation("扣減餘額");
await Task.Delay();
return context.Completed(new DeductBalanceLog() { Price = });
}
}
public class DeductBalanceModel
{
public string CustomerId { get; set; }
public int Price { get; set; }
}
public class DeductBalanceLog
{
public int Price { get; set; }
}

事務第三步,建立訂單相關程式碼

 public class CreateOrderActivity : IExecuteActivity<CreateOrderModel>
{
private readonly ILogger<CreateOrderActivity> logger;
public CreateOrderActivity(ILogger<CreateOrderActivity> logger)
{
this.logger = logger;
}
public async Task<ExecutionResult> Execute(ExecuteContext<CreateOrderModel> context)
{
logger.LogInformation("建立訂單");
await Task.Delay();
//throw new CommonActivityExecuteFaildException("當日訂單已達到上限");
return context.CompletedWithVariables(new CreateOrderResult { OrderId="",Message="建立訂單成功" });
}
}
public class CreateOrderModel
{
public string ProductId { get; set; }
public string CustomerId { get; set; }
public int Price { get; set; }
}
public class CreateOrderResult
{
public string OrderId { get; set; }
public string Message { get; set; }
}

我通過 消費建立訂單 request,獲取 request 的 response 地址與RequestId,這兩個值 返回 response 時需要用到,我把這些資訊存到RoutingSlip中,並且訂閱RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted | RoutingSlipEvents.CompensationFailed 三種事件,當這三種訊息出現時 我會根據 事件類別 和RoutingSlip中 之前加入的 (response 地址與RequestId)生成 Response ,整個過程大概就是這麼個意思,沒理解可以看demo。這裡由於每一個事物所需要用到的RoutingSlip + Request/Response 步驟都類似 可以抽象一下(模板方法),把Activity 的組裝 延遲到派生類去解決,這個代理類Masstransit有 ,但是官方沒有顧及到CompensationFailed 的情況,所以我乾脆自己再寫一個。

    public abstract class RoutingSlipDefaultRequestProxy<TRequest> :
IConsumer<TRequest>
where TRequest : class
{
public async Task Consume(ConsumeContext<TRequest> context)
{
var builder = new RoutingSlipBuilder(NewId.NextGuid()); builder.AddSubscription(context.ReceiveContext.InputAddress, RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted | RoutingSlipEvents.CompensationFailed); builder.AddVariable("RequestId", context.RequestId);
builder.AddVariable("ResponseAddress", context.ResponseAddress);
builder.AddVariable("FaultAddress", context.FaultAddress);
builder.AddVariable("Request", context.Message); await BuildRoutingSlip(builder, context); var routingSlip = builder.Build(); await context.Execute(routingSlip).ConfigureAwait(false);
} protected abstract Task BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<TRequest> request);
}

這個 是派生類 Routing slip 的拼裝過程

    public class CreateOrderRequestProxy : RoutingSlipDefaultRequestProxy<CreateOrderCommand>

    {
private readonly IConfiguration configuration;
public CreateOrderRequestProxy(IConfiguration configuration)
{
this.configuration = configuration;
}
protected override Task BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<CreateOrderCommand> request)
{
builder.AddActivity("DeductStock", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/DeductStock_execute"), new DeductStockModel { ProductId = request.Message.ProductId }); builder.AddActivity("DeductBalance", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/DeductBalance_execute"), new DeductBalanceModel { CustomerId = request.Message.CustomerId, Price = request.Message.Price }); builder.AddActivity("CreateOrder", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/CreateOrder_execute"), new CreateOrderModel { Price = request.Message.Price, CustomerId = request.Message.CustomerId, ProductId = request.Message.ProductId }); return Task.CompletedTask;
}
}

構造response 基類,主要是對三種情況做處理。

    public abstract class RoutingSlipDefaultResponseProxy<TRequest, TResponse, TFaultResponse> : IConsumer<RoutingSlipCompensationFailed>, IConsumer<RoutingSlipCompleted>,
IConsumer<RoutingSlipFaulted>
where TRequest : class
where TResponse : class
where TFaultResponse : class
{
public async Task Consume(ConsumeContext<RoutingSlipCompleted> context)
{
var request = context.Message.GetVariable<TRequest>("Request");
var requestId = context.Message.GetVariable<Guid>("RequestId"); Uri responseAddress = null;
if (context.Message.Variables.ContainsKey("ResponseAddress"))
responseAddress = context.Message.GetVariable<Uri>("ResponseAddress"); if (responseAddress == null)
throw new ArgumentException($"The response address could not be found for the faulted routing slip: {context.Message.TrackingNumber}"); var endpoint = await context.GetResponseEndpoint<TResponse>(responseAddress, requestId).ConfigureAwait(false); var response = await CreateResponseMessage(context, request); await endpoint.Send(response).ConfigureAwait(false);
} public async Task Consume(ConsumeContext<RoutingSlipFaulted> context)
{
var request = context.Message.GetVariable<TRequest>("Request");
var requestId = context.Message.GetVariable<Guid>("RequestId"); Uri faultAddress = null;
if (context.Message.Variables.ContainsKey("FaultAddress"))
faultAddress = context.Message.GetVariable<Uri>("FaultAddress");
if (faultAddress == null && context.Message.Variables.ContainsKey("ResponseAddress"))
faultAddress = context.Message.GetVariable<Uri>("ResponseAddress"); if (faultAddress == null)
throw new ArgumentException($"The fault/response address could not be found for the faulted routing slip: {context.Message.TrackingNumber}"); var endpoint = await context.GetFaultEndpoint<TResponse>(faultAddress, requestId).ConfigureAwait(false); var response = await CreateFaultedResponseMessage(context, request, requestId); await endpoint.Send(response).ConfigureAwait(false);
}
public async Task Consume(ConsumeContext<RoutingSlipCompensationFailed> context)
{
var request = context.Message.GetVariable<TRequest>("Request");
var requestId = context.Message.GetVariable<Guid>("RequestId"); Uri faultAddress = null;
if (context.Message.Variables.ContainsKey("FaultAddress"))
faultAddress = context.Message.GetVariable<Uri>("FaultAddress");
if (faultAddress == null && context.Message.Variables.ContainsKey("ResponseAddress"))
faultAddress = context.Message.GetVariable<Uri>("ResponseAddress"); if (faultAddress == null)
throw new ArgumentException($"The fault/response address could not be found for the faulted routing slip: {context.Message.TrackingNumber}"); var endpoint = await context.GetFaultEndpoint<TResponse>(faultAddress, requestId).ConfigureAwait(false); var response = await CreateCompensationFaultedResponseMessage(context, request, requestId); await endpoint.Send(response).ConfigureAwait(false);
}
protected abstract Task<TResponse> CreateResponseMessage(ConsumeContext<RoutingSlipCompleted> context, TRequest request); protected abstract Task<TFaultResponse> CreateFaultedResponseMessage(ConsumeContext<RoutingSlipFaulted> context, TRequest request, Guid requestId);
protected abstract Task<TFaultResponse> CreateCompensationFaultedResponseMessage(ConsumeContext<RoutingSlipCompensationFailed> context, TRequest request, Guid requestId);
}

Response 派生類 ,這裡邏輯可以隨自己定義,我也是隨便寫了個 CommonResponse和一個業務錯誤拋錯(犧牲了一點效能)。

    public class CreateOrderResponseProxy :
RoutingSlipDefaultResponseProxy<CreateOrderCommand, CommonCommandResponse<CreateOrderResult>, CommonCommandResponse<CreateOrderResult>>
{ protected override Task<CommonCommandResponse<CreateOrderResult>> CreateResponseMessage(ConsumeContext<RoutingSlipCompleted> context, CreateOrderCommand request)
{ return Task.FromResult(new CommonCommandResponse<CreateOrderResult>
{
Status = ,
Result = new CreateOrderResult
{
Message = context.Message.Variables.TryGetAndReturn(nameof(CreateOrderResult.Message))?.ToString(),
OrderId = context.Message.Variables.TryGetAndReturn(nameof(CreateOrderResult.OrderId))?.ToString(),
}
});
}
protected override Task<CommonCommandResponse<CreateOrderResult>> CreateFaultedResponseMessage(ConsumeContext<RoutingSlipFaulted> context, CreateOrderCommand request, Guid requestId)
{
var commonActivityExecuteFaildException = context.Message.ActivityExceptions.FirstOrDefault(m => m.ExceptionInfo.ExceptionType == typeof(CommonActivityExecuteFaildException).FullName);
if (commonActivityExecuteFaildException != null)
{
return Task.FromResult(new CommonCommandResponse<CreateOrderResult>
{
Status = ,
Message = commonActivityExecuteFaildException.ExceptionInfo.Message
});
}
// system error log here
return Task.FromResult(new CommonCommandResponse<CreateOrderResult>
{
Status = ,
Message = "System error"
});
} protected override Task<CommonCommandResponse<CreateOrderResult>> CreateCompensationFaultedResponseMessage(ConsumeContext<RoutingSlipCompensationFailed> context, CreateOrderCommand request, Guid requestId)
{
var exception = context.Message.ExceptionInfo;
// lg here context.Message.ExceptionInfo
return Task.FromResult(new CommonCommandResponse<CreateOrderResult>
{
Status = ,
Message = "System error"
});
}
}

對於CompensationFailed 的處理 通過ActivityCompensateErrorTransportFilter 實現 傳送到錯誤訊息佇列,後續通過prometheus + rabbitmq-exporter + alertmanager 觸發告警 通知相關人員處理。

  public class ActivityCompensateErrorTransportFilter<TActivity, TLog> : IFilter<CompensateActivityContext<TActivity, TLog>>
where TActivity : class, ICompensateActivity<TLog>
where TLog : class
{
public void Probe(ProbeContext context)
{
context.CreateFilterScope("moveFault");
} public async Task Send(CompensateActivityContext<TActivity, TLog> context, IPipe<CompensateActivityContext<TActivity, TLog>> next)
{
try
{
await next.Send(context).ConfigureAwait(false);
}
catch(Exception ex)
{
if (!context.TryGetPayload(out IErrorTransport transport))
throw new TransportException(context.ReceiveContext.InputAddress, $"The {nameof(IErrorTransport)} was not available on the {nameof(ReceiveContext)}.");
var exceptionReceiveContext = new RescueExceptionReceiveContext(context.ReceiveContext, ex);
await transport.Send(exceptionReceiveContext);
}
}
}

註冊 filter

    public class RoutingSlipCompensateErrorSpecification<TActivity, TLog> : IPipeSpecification<CompensateActivityContext<TActivity, TLog>>
where TActivity : class, ICompensateActivity<TLog>
where TLog : class
{
public void Apply(IPipeBuilder<CompensateActivityContext<TActivity, TLog>> builder)
{
builder.AddFilter(new ActivityCompensateErrorTransportFilter<TActivity, TLog>());
} public IEnumerable<ValidationResult> Validate()
{
yield return this.Success("success");
}
} cfg.ReceiveEndpoint("DeductStock_compensate", ep =>
{
ep.PrefetchCount = ;
ep.CompensateActivityHost<DeductStockActivity, DeductStockLog>(context.Container, conf =>
{
conf.AddPipeSpecification(new RoutingSlipCompensateErrorSpecification<DeductStockActivity, DeductStockLog>());
}); });

實現建立產品(建立完成+新增庫存)

實現了 建立訂單的功能,整個流程其實是同步的,我在想能不能實現最為簡單的最終一致性 比如 建立一個產品 ,然後非同步生成它的庫存 ,我發現是可以的,因為我們可以監聽到每一個Execute Activity 的完成事件,並且把出錯時的資訊通過 filter 塞到 錯誤佇列中。

這裡的程式碼就不貼了,詳情請看 demo