CAP-微服務間通訊實踐
微服務間通訊常見的兩種方式
由於微服務架構慢慢被更多人使用後,迎面而來的問題是如何做好微服務間通訊的方案。我們先分析下目前最常用的兩種服務間通訊方案。
gRPC(rpc遠端呼叫)
- 場景:A服務主動發起請求到B服務,同步方式
- 範圍:只在微服務間通訊應用
EventBus(基於訊息佇列的整合事件)
- 技術:NotNetCore.Cap + Rabbitmq + Database
- 場景:A服務要在B服務做某件事情後響應,非同步方式
- 實現:B服務在完成某件事情後釋出訊息,A服務訂閱此訊息
- 範圍:只在微服務間通訊應用
通過對比,兩種方式完全不一樣。rpc是類似於http請求的及時響應機制,但是比http更輕量、快捷,它更像以前的微軟的WCF,可以自動生成客戶端程式碼,充分體現了面向實體物件的遠端呼叫的思想;Eventbus是非同步的訊息機制,基於cap的思想,不關心下游訂閱方服務是否消費成功,保障了主服務業務的流暢性,同時也是一款分散式事務的實現方案,可以保障分散式架構中的資料的最終一致性。
我們今天主要介紹CAP在微服務中的實踐案例。
搭建框架介紹
新建專案
- 新建解決方案 DotNetCore.Cap.Demo
- 新建專案 DotNetCore.Cap.Demo.Publisher 訊息釋出端
- 新建專案 DotNetCore.Cap.Demo.Subscriber 訊息訂閱端
主要sdk
- 專案框架 netcoreapp 3.1
- 訊息佇列選用RabbitMQ
- 資料庫儲存選用PostgreSql
根據實際情況選擇合適的訊息佇列和資料庫儲存
CAP 支援 Kafka、RabbitMQ、AzureServiceBus 訊息佇列:
PM> Install-Package DotNetCore.CAP.Kafka PM> Install-Package DotNetCore.CAP.RabbitMQ PM> Install-Package DotNetCore.CAP.AzureServiceBus
CAP 提供了 Sql Server, MySql, PostgreSQL,MongoDB 作為資料庫儲存:
PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB
本次demo使用的nuget包如下所示
$ dotnet list package 專案“DotNetCore.Cap.Demo.Publisher”具有以下包引用 [netcoreapp3.1]: 頂級包 已請求 已解決 > DotNetCore.CAP 3.1.1 3.1.1 > DotNetCore.CAP.PostgreSql 3.1.1 3.1.1 > DotNetCore.CAP.RabbitMQ 3.1.1 3.1.1 > Microsoft.VisualStudio.Azure.Containers.Tools.Targets 1.10.9 1.10.9 > Npgsql.EntityFrameworkCore.PostgreSQL 3.1.4 3.1.4 > Npgsql.EntityFrameworkCore.PostgreSQL.Design 1.1.0 1.1.0 專案“DotNetCore.Cap.Demo.Subscriber”具有以下包引用 [netcoreapp3.1]: 頂級包 已請求 已解決 > DotNetCore.CAP 3.1.1 3.1.1 > DotNetCore.CAP.PostgreSql 3.1.1 3.1.1 > DotNetCore.CAP.RabbitMQ 3.1.1 3.1.1 > Microsoft.VisualStudio.Azure.Containers.Tools.Targets 1.10.9 1.10.9 > Npgsql.EntityFrameworkCore.PostgreSQL 3.1.4 3.1.4 > Npgsql.EntityFrameworkCore.PostgreSQL.Design 1.1.0 1.1.0
DotNetCore.Cap.Demo.Publisher 訊息釋出端
修改Startup檔案
注入dotnetcore.cap元件及資料庫上下文
services.AddDbContext<PgDbContext>(p => p.UseNpgsql("資料庫連線字串"));
services.AddCap(x =>
{
//rabbitmq在docker執行時,需要對映兩個埠:5672和15672
//5672供程式集訪問
//15672供web訪問
//預設使用者名稱和密碼為:guest/guest
x.UseRabbitMQ(p =>
{
p.HostName = "localhost";
p.Port = 5672;
p.UserName = "guest";
p.Password = "guest";
});
x.UseEntityFramework<PgDbContext>();
//x.FailedRetryCount = 1;
//x.UseDashboard();
});
新建Controller作為Publisher
- 建構函式注入DotNetCore.CAP.ICapPublisher
提供了同步和非同步的訊息釋出方法
- 釋出字串訊息
await _capPublisher.PublishAsync("訊息名稱", "訊息內容");
[HttpGet("string")]
public async Task<IActionResult> PublishString()
{
await _capPublisher.PublishAsync("sample.rabbitmq.demo.string", "this is text!");
return Ok();
}
- 釋出物件
await _capPublisher.PublishAsync("訊息名稱", "訊息物件");
[HttpGet("dynamic")]
public async Task<IActionResult> PublishDynamic()
{
await _capPublisher.PublishAsync("sample.rabbitmq.demo.dynamic", new
{
Name = "xiao gou",
Age = 18
});
return Ok();
}
- 分散式事務場景
當需要在資料庫操作後,釋出訊息出去,DotNetCore.Cap也提供了分散式事務的解決方案。它擴充套件的transcation能保證只有在資料庫操作和訊息傳送都完成後,才提交Commit
[HttpGet("transcation")]
public async Task<IActionResult> PublishWithTranscation()
{
using (var trans = _pgDbContext.Database.BeginTransaction(_capPublisher))
{
var apiConfig = new ApiConfig
{
ApiName = "111122",
ApiDesc = "223",
ReturnType = "1",
ReturnExpect = "1",
IsAsync = true,
OperCode = "999",
OperTime = DateTime.Now
};
await _pgDbContext.ApiConfig.AddAsync(apiConfig);
await _pgDbContext.SaveChangesAsync();
_capPublisher.Publish("sample.rabbitmq.demo.transcation", apiConfig);
trans.Commit();
return Ok();
}
}
DotNetCore.Cap.Demo.Subscriber 訊息訂閱端
修改Startup檔案
注入dotnetcore.cap元件及資料庫上下文
services.AddDbContext<PgDbContext>(p => p.UseNpgsql("資料庫連線字串"));
services.AddCap(x =>
{
//rabbitmq在docker執行時,需要對映兩個埠:5672和15672
//5672供程式集訪問
//15672供web訪問
//預設使用者名稱和密碼為:guest/guest
x.UseRabbitMQ(p =>
{
p.HostName = "localhost";
p.Port = 5672;
p.UserName = "guest";
p.Password = "guest";
});
x.UseEntityFramework<PgDbContext>();
//x.FailedRetryCount = 1;
//x.UseDashboard();
});
新建Controller作為Subscriber
- 訂閱字串訊息
[NonAction] 標籤:Indicates that a controller method is not an action method.
[CapSubscribe] 標籤:標誌此方法為訂閱方法,並以訊息名稱匹配發布端的訊息事件
[NonAction]
[CapSubscribe("sample.rabbitmq.demo.string")]
public void SubscriberString(string text)
{
Console.WriteLine($"【SubscriberString】Subscriber invoked, Info: {text}");
}
- 訂閱物件訊息
方法入參person即為訊息體
[NonAction]
[CapSubscribe("sample.rabbitmq.demo.dynamic")]
public void SubscriberDynamic(dynamic person)
{
Console.WriteLine($"【SubscriberDynamic】Subscriber invoked, Info: {person.Name} {person.Age}");
}
新建Service作為Subscriber
- 除了Controller可以作為訊息訂閱端外,也可以用繼承自DotNetCore.CAP.ICapSubscribe介面的Service作為訂閱端
- Controller作為訂閱者時,不用繼承ICapSubscribe;Service作為訂閱者時,必須繼承ICapSubscribe
- Controller和Service同時訂閱了一個訊息時,只觸發了Service的消費;若要多個消費,需要在不同的Group下
using DotNetCore.CAP;
using System;
namespace DotNetCore.Cap.Demo.Subscriber.Services
{
/// <summary>
/// 消費訂閱服務
/// </summary>
public class SubscriberService : ICapSubscribe
{
[CapSubscribe("sample.rabbitmq.demo.string")]
public void SubscriberString(string text)
{
Console.WriteLine($"【SubscriberString】Subscriber invoked, Info: {text}");
}
[CapSubscribe("sample.rabbitmq.demo.dynamic")]
public void SubscriberDynamic(dynamic person)
{
Console.WriteLine($"【SubscriberDynamic】Subscriber invoked, Info: {person.Name} {person.Age}");
}
[CapSubscribe("sample.rabbitmq.demo.object")]
public void SubscriberObject(Person person)
{
Console.WriteLine($"【SubscriberObject】Subscriber invoked, Info: {person.Name} {person.Age}");
}
[CapSubscribe("sample.rabbitmq.demo.trans")]
public void SubscriberTrans(ApiConfig apiConfig)
{
Console.WriteLine($"【SubscriberTrans】Subscriber invoked, Info: {apiConfig.Id}");
}
}
}
總結
- DotNetCore.Cap 是一種非同步訊息的通訊,可以作為微服務間通訊的一種方式
- DotNetCore.Cap 為微服務架構提供了分散式事務的解決方案,保障了資料的最終一致性
- 開發中,應根據實際業務需求和場景,選擇合適可靠的微服務間通訊方案
參考
https://github.com/dotnetcore/CAP/blob/master/README.md
https://book.douban.com/subject/33425123/