使用 Masstransit中的 Request/Response 與 Courier 功能實現最終一致性
簡介
目前的.net 生態中,最終一致性元件的選擇一直是一個問題。本地事務表(cap)需要在每個服務的資料庫中插入訊息表,而且做不了此類事務 比如:建立訂單需要 餘額滿足+庫存滿足,庫存和餘額處於兩個服務中。masstransit 是我目前主要用的方案。以往一般都用 masstransit 中的 sagas 來實現 最終一致性,但是隨著併發的增加必定會對sagas 持久化的資料庫造成很大的壓力,根據stackoverflow 中的一個回答 我發現了 一個用 Request/Response 與 Courier 功能 實現最終一致性的方案 Demo地址。
Masstransit 中 Resquest/Response 功能
訊息DTO
public class SampleMessageCommand { }
消費者
public class SampleMessageCommandHandler : IConsumer<SampleMessageCommand> { public async Task Consume(ConsumeContext<SampleMessageCommand> context) { await context.RespondAsync(new SampleMessageCommandResult() { Data = "Sample" }); } }
返回結果DTO
public class SampleMessageCommandResult { public string Data { get; set; } }
呼叫方式與註冊方式略過,詳情請看 官方文件。
本質上使用訊息佇列實現 Resquest/Response,客戶端(生產者)將請求訊息傳送至指定訊息佇列並賦予RequestId和ResponseAddress(臨時佇列 rabbitmq),服務端(消費者)消費訊息並把 需要返回的訊息放入指定ResponseAddress,客戶端收到 Response message 通過匹配 RequestId 找到 指定Request,最後返回資訊。
Masstransit 中 Courier 功能
通過有序組合一系列的Activity,得到一個routing slip。每個 activity(忽略 Execute Activities) 都有 Execute 和 Compensate 兩個方法。Compensate 用來執撤銷 Execute 方法產生的影響(就是回退 Execute 方法)。每個 Activity Execute 最後都會 呼叫 Completed 方法把 回退所需要的的資訊記錄在message中,最後持久化到訊息佇列的某一個訊息中。
餘額扣減的Activity ,這裡的 DeductBalanceModel 是請求扣減的資料模型,DeductBalanceLog 是回退時需要用到的資訊。
public class DeductBalanceActivity : IActivity<DeductBalanceModel, DeductBalanceLog> { private readonly ILogger<DeductBalanceActivity> logger; public DeductBalanceActivity(ILogger<DeductBalanceActivity> logger) { this.logger = logger; } public async Task<CompensationResult> Compensate(CompensateContext<DeductBalanceLog> context) { logger.LogInformation("還原餘額"); var log = context.Log; //可以獲取 所有execute 完成時儲存的資訊 //throw new ArgumentException("some things were wrong"); return context.Compensated(); } public async Task<ExecutionResult> Execute(ExecuteContext<DeductBalanceModel> context) { logger.LogInformation("扣減餘額"); await Task.Delay(100); return context.Completed(new DeductBalanceLog() { Price = 100 }); } }
扣減庫存 Activity
public class DeductStockActivity : IActivity<DeductStockModel, DeductStockLog> { private readonly ILogger<DeductStockActivity> logger; public DeductStockActivity(ILogger<DeductStockActivity> logger) { this.logger = logger; } public async Task<CompensationResult> Compensate(CompensateContext<DeductStockLog> context) { var log = context.Log; logger.LogInformation("還原庫存"); return context.Compensated(); } public async Task<ExecutionResult> Execute(ExecuteContext<DeductStockModel> context) { var argument = context.Arguments; logger.LogInformation("扣減庫存"); await Task.Delay(100); return context.Completed(new DeductStockLog() { ProductId = argument.ProductId, Amount = 1 }); } }
生成訂單 Execute Activity
public class CreateOrderActivity : IExecuteActivity<CreateOrderModel> { private readonly ILogger<CreateOrderActivity> logger; public CreateOrderActivity(ILogger<CreateOrderActivity> logger) { this.logger = logger; } public async Task<ExecutionResult> Execute(ExecuteContext<CreateOrderModel> context) { logger.LogInformation("建立訂單"); await Task.Delay(100); //throw new CommonActivityExecuteFaildException("當日訂單已達到上限"); return context.CompletedWithVariables(new CreateOrderResult { OrderId="111122",Message="建立訂單成功" }); } }
組裝 以上 Activity 生成一個 Routing Slip,這是一個有序的組合,扣減庫存=》扣減餘額=》生成訂單
var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity("DeductStock", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/DeductStock_execute"), new DeductStockModel { ProductId = request.Message.ProductId }); builder.AddActivity("DeductBalance", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/DeductBalance_execute"), new DeductBalanceModel { CustomerId = request.Message.CustomerId, Price = request.Message.Price }); builder.AddActivity("CreateOrder", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/CreateOrder_execute"), new CreateOrderModel { Price = request.Message.Price, CustomerId = request.Message.CustomerId, ProductId = request.Message.ProductId });
var routingSlip = builder.Build();
執行 Routing Slip
await bus.Execute(routingSlip);
這裡是沒有任何返回值的,所有activity都是 非同步執行,雖然所有的activity可以執行完成或者由於某個Activity執行出錯而全部回退。(其實這裡有一種更壞的情況就是 Compensate 出錯,預設情況下 Masstransit 只會傳送一個回退錯誤的訊息,後面講到建立訂單的時候我會把它塞到錯誤佇列裡,這樣我們可以通過修改 Compensate bug後重新匯入到正常佇列來修正資料),這個功能完全滿足不了 建立訂單這個需求,執行 await bus.Execute(routingSlip) 後我們完全不知道訂單到底建立成功,還是由於庫存或餘額不足而失敗了(非同步)。
還好 routing slip 在執行過程中產生很多訊息,比如 RoutingSlipCompleted ,RoutingSlipCompensationFailed ,RoutingSlipActivityCompleted,RoutingSlipActivityFaulted 等,具體文件,我們可以訂閱這些事件,再結合Request/Response 實現 建立訂單的功能。
)
實現建立訂單(庫存滿足+餘額滿足)長流程
建立訂單 command
/// <summary> /// 長流程 分散式事務 /// </summary> public class CreateOrderCommand { public string ProductId { get; set; } public string CustomerId { get; set; } public int Price { get; set; } }
事務第一步,扣減庫存相關 程式碼
public class DeductStockActivity : IActivity<DeductStockModel, DeductStockLog> { private readonly ILogger<DeductStockActivity> logger; public DeductStockActivity(ILogger<DeductStockActivity> logger) { this.logger = logger; } public async Task<CompensationResult> Compensate(CompensateContext<DeductStockLog> context) { var log = context.Log; logger.LogInformation("還原庫存"); return context.Compensated(); } public async Task<ExecutionResult> Execute(ExecuteContext<DeductStockModel> context) { var argument = context.Arguments; logger.LogInformation("扣減庫存"); await Task.Delay(100); return context.Completed(new DeductStockLog() { ProductId = argument.ProductId, Amount = 1 }); } } public class DeductStockModel { public string ProductId { get; set; } } public class DeductStockLog { public string ProductId { get; set; } public int Amount { get; set; } }
事務第二步,扣減餘額相關程式碼
public class DeductBalanceActivity : IActivity<DeductBalanceModel, DeductBalanceLog> { private readonly ILogger<DeductBalanceActivity> logger; public DeductBalanceActivity(ILogger<DeductBalanceActivity> logger) { this.logger = logger; } public async Task<CompensationResult> Compensate(CompensateContext<DeductBalanceLog> context) { logger.LogInformation("還原餘額"); var log = context.Log; //throw new ArgumentException("some things were wrong"); return context.Compensated(); } public async Task<ExecutionResult> Execute(ExecuteContext<DeductBalanceModel> context) { logger.LogInformation("扣減餘額"); await Task.Delay(100); return context.Completed(new DeductBalanceLog() { Price = 100 }); } } public class DeductBalanceModel { public string CustomerId { get; set; } public int Price { get; set; } } public class DeductBalanceLog { public int Price { get; set; } }
事務第三步,建立訂單相關程式碼
public class CreateOrderActivity : IExecuteActivity<CreateOrderModel> { private readonly ILogger<CreateOrderActivity> logger; public CreateOrderActivity(ILogger<CreateOrderActivity> logger) { this.logger = logger; } public async Task<ExecutionResult> Execute(ExecuteContext<CreateOrderModel> context) { logger.LogInformation("建立訂單"); await Task.Delay(100); //throw new CommonActivityExecuteFaildException("當日訂單已達到上限"); return context.CompletedWithVariables(new CreateOrderResult { OrderId="111122",Message="建立訂單成功" }); } } public class CreateOrderModel { public string ProductId { get; set; } public string CustomerId { get; set; } public int Price { get; set; } } public class CreateOrderResult { public string OrderId { get; set; } public string Message { get; set; } }
我通過 消費 建立訂單 request,獲取 request 的 response 地址與 RequestId,這兩個值 返回 response 時需要用到,我把這些資訊存到 RoutingSlip中,並且訂閱 RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted | RoutingSlipEvents.CompensationFailed 三種事件,當這三種訊息出現時 我會根據 事件類別 和RoutingSlip中 之前加入的 (response 地址與 RequestId)生成 Response ,整個過程大概就是這麼個意思,沒理解可以看demo。這裡由於每一個事物所需要用到的 RoutingSlip + Request/Response 步驟都類似 可以抽象一下(模板方法),把Activity 的組裝 延遲到派生類去解決,這個代理類Masstransit有 ,但是官方沒有顧及到 CompensationFailed 的情況,所以我乾脆自己再寫一個。
public abstract class RoutingSlipDefaultRequestProxy<TRequest> : IConsumer<TRequest> where TRequest : class { public async Task Consume(ConsumeContext<TRequest> context) { var builder = new RoutingSlipBuilder(NewId.NextGuid()); builder.AddSubscription(context.ReceiveContext.InputAddress, RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted | RoutingSlipEvents.CompensationFailed); builder.AddVariable("RequestId", context.RequestId); builder.AddVariable("ResponseAddress", context.ResponseAddress); builder.AddVariable("FaultAddress", context.FaultAddress); builder.AddVariable("Request", context.Message); await BuildRoutingSlip(builder, context); var routingSlip = builder.Build(); await context.Execute(routingSlip).ConfigureAwait(false); } protected abstract Task BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<TRequest> request); }
這個 是派生類 Routing slip 的拼裝過程
public class CreateOrderRequestProxy : RoutingSlipDefaultRequestProxy<CreateOrderCommand> { private readonly IConfiguration configuration; public CreateOrderRequestProxy(IConfiguration configuration) { this.configuration = configuration; } protected override Task BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<CreateOrderCommand> request) { builder.AddActivity("DeductStock", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/DeductStock_execute"), new DeductStockModel { ProductId = request.Message.ProductId }); builder.AddActivity("DeductBalance", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/DeductBalance_execute"), new DeductBalanceModel { CustomerId = request.Message.CustomerId, Price = request.Message.Price }); builder.AddActivity("CreateOrder", new Uri($"{configuration["RabbitmqConfig:HostUri"]}/CreateOrder_execute"), new CreateOrderModel { Price = request.Message.Price, CustomerId = request.Message.CustomerId, ProductId = request.Message.ProductId }); return Task.CompletedTask; } }
構造response 基類,主要是對三種情況做處理。
public abstract class RoutingSlipDefaultResponseProxy<TRequest, TResponse, TFaultResponse> : IConsumer<RoutingSlipCompensationFailed>, IConsumer<RoutingSlipCompleted>, IConsumer<RoutingSlipFaulted> where TRequest : class where TResponse : class where TFaultResponse : class { public async Task Consume(ConsumeContext<RoutingSlipCompleted> context) { var request = context.Message.GetVariable<TRequest>("Request"); var requestId = context.Message.GetVariable<Guid>("RequestId"); Uri responseAddress = null; if (context.Message.Variables.ContainsKey("ResponseAddress")) responseAddress = context.Message.GetVariable<Uri>("ResponseAddress"); if (responseAddress == null) throw new ArgumentException($"The response address could not be found for the faulted routing slip: {context.Message.TrackingNumber}"); var endpoint = await context.GetResponseEndpoint<TResponse>(responseAddress, requestId).ConfigureAwait(false); var response = await CreateResponseMessage(context, request); await endpoint.Send(response).ConfigureAwait(false); } public async Task Consume(ConsumeContext<RoutingSlipFaulted> context) { var request = context.Message.GetVariable<TRequest>("Request"); var requestId = context.Message.GetVariable<Guid>("RequestId"); Uri faultAddress = null; if (context.Message.Variables.ContainsKey("FaultAddress")) faultAddress = context.Message.GetVariable<Uri>("FaultAddress"); if (faultAddress == null && context.Message.Variables.ContainsKey("ResponseAddress")) faultAddress = context.Message.GetVariable<Uri>("ResponseAddress"); if (faultAddress == null) throw new ArgumentException($"The fault/response address could not be found for the faulted routing slip: {context.Message.TrackingNumber}"); var endpoint = await context.GetFaultEndpoint<TResponse>(faultAddress, requestId).ConfigureAwait(false); var response = await CreateFaultedResponseMessage(context, request, requestId); await endpoint.Send(response).ConfigureAwait(false); } public async Task Consume(ConsumeContext<RoutingSlipCompensationFailed> context) { var request = context.Message.GetVariable<TRequest>("Request"); var requestId = context.Message.GetVariable<Guid>("RequestId"); Uri faultAddress = null; if (context.Message.Variables.ContainsKey("FaultAddress")) faultAddress = context.Message.GetVariable<Uri>("FaultAddress"); if (faultAddress == null && context.Message.Variables.ContainsKey("ResponseAddress")) faultAddress = context.Message.GetVariable<Uri>("ResponseAddress"); if (faultAddress == null) throw new ArgumentException($"The fault/response address could not be found for the faulted routing slip: {context.Message.TrackingNumber}"); var endpoint = await context.GetFaultEndpoint<TResponse>(faultAddress, requestId).ConfigureAwait(false); var response = await CreateCompensationFaultedResponseMessage(context, request, requestId); await endpoint.Send(response).ConfigureAwait(false); } protected abstract Task<TResponse> CreateResponseMessage(ConsumeContext<RoutingSlipCompleted> context, TRequest request); protected abstract Task<TFaultResponse> CreateFaultedResponseMessage(ConsumeContext<RoutingSlipFaulted> context, TRequest request, Guid requestId); protected abstract Task<TFaultResponse> CreateCompensationFaultedResponseMessage(ConsumeContext<RoutingSlipCompensationFailed> context, TRequest request, Guid requestId); }
Response 派生類 ,這裡邏輯可以隨自己定義,我也是隨便寫了個 CommonResponse和一個業務錯誤拋錯(犧牲了一點效能)。
public class CreateOrderResponseProxy : RoutingSlipDefaultResponseProxy<CreateOrderCommand, CommonCommandResponse<CreateOrderResult>, CommonCommandResponse<CreateOrderResult>> { protected override Task<CommonCommandResponse<CreateOrderResult>> CreateResponseMessage(ConsumeContext<RoutingSlipCompleted> context, CreateOrderCommand request) { return Task.FromResult(new CommonCommandResponse<CreateOrderResult> { Status = 1, Result = new CreateOrderResult { Message = context.Message.Variables.TryGetAndReturn(nameof(CreateOrderResult.Message))?.ToString(), OrderId = context.Message.Variables.TryGetAndReturn(nameof(CreateOrderResult.OrderId))?.ToString(), } }); } protected override Task<CommonCommandResponse<CreateOrderResult>> CreateFaultedResponseMessage(ConsumeContext<RoutingSlipFaulted> context, CreateOrderCommand request, Guid requestId) { var commonActivityExecuteFaildException = context.Message.ActivityExceptions.FirstOrDefault(m => m.ExceptionInfo.ExceptionType == typeof(CommonActivityExecuteFaildException).FullName); if (commonActivityExecuteFaildException != null) { return Task.FromResult(new CommonCommandResponse<CreateOrderResult> { Status = 2, Message = commonActivityExecuteFaildException.ExceptionInfo.Message }); } // system error log here return Task.FromResult(new CommonCommandResponse<CreateOrderResult> { Status = 3, Message = "System error" }); } protected override Task<CommonCommandResponse<CreateOrderResult>> CreateCompensationFaultedResponseMessage(ConsumeContext<RoutingSlipCompensationFailed> context, CreateOrderCommand request, Guid requestId) { var exception = context.Message.ExceptionInfo; // lg here context.Message.ExceptionInfo return Task.FromResult(new CommonCommandResponse<CreateOrderResult> { Status = 3, Message = "System error" }); } }
對於 CompensationFailed 的處理 通過 ActivityCompensateErrorTransportFilter 實現 傳送到錯誤訊息佇列,後續通過prometheus + rabbitmq-exporter + alertmanager 觸發告警 通知相關人員處理。
public class ActivityCompensateErrorTransportFilter<TActivity, TLog> : IFilter<CompensateActivityContext<TActivity, TLog>> where TActivity : class, ICompensateActivity<TLog> where TLog : class { public void Probe(ProbeContext context) { context.CreateFilterScope("moveFault"); } public async Task Send(CompensateActivityContext<TActivity, TLog> context, IPipe<CompensateActivityContext<TActivity, TLog>> next) { try { await next.Send(context).ConfigureAwait(false); } catch(Exception ex) { if (!context.TryGetPayload(out IErrorTransport transport)) throw new TransportException(context.ReceiveContext.InputAddress, $"The {nameof(IErrorTransport)} was not available on the {nameof(ReceiveContext)}."); var exceptionReceiveContext = new RescueExceptionReceiveContext(context.ReceiveContext, ex); await transport.Send(exceptionReceiveContext); } } }
註冊 filter
public class RoutingSlipCompensateErrorSpecification<TActivity, TLog> : IPipeSpecification<CompensateActivityContext<TActivity, TLog>> where TActivity : class, ICompensateActivity<TLog> where TLog : class { public void Apply(IPipeBuilder<CompensateActivityContext<TActivity, TLog>> builder) { builder.AddFilter(new ActivityCompensateErrorTransportFilter<TActivity, TLog>()); } public IEnumerable<ValidationResult> Validate() { yield return this.Success("success"); } } cfg.ReceiveEndpoint("DeductStock_compensate", ep => { ep.PrefetchCount = 100; ep.CompensateActivityHost<DeductStockActivity, DeductStockLog>(context.Container, conf => { conf.AddPipeSpecification(new RoutingSlipCompensateErrorSpecification<DeductStockActivity, DeductStockLog>()); }); });
實現建立產品(建立完成+新增庫存)
實現了 建立訂單的功能,整個流程其實是同步的,我在想能不能實現最為簡單的最終一致性 比如 建立一個產品 ,然後非同步生成它的庫存 ,我發現是可以的,因為我們可以監聽到每一個Execute Activity 的完成事件,並且把出錯時的資訊通過 filter 塞到 錯誤佇列中。
這裡的程式碼就不貼了,詳情請看 demo