1. 程式人生 > 其它 >.NET Core 事件匯流排,分散式事務解決方案:CAP

.NET Core 事件匯流排,分散式事務解決方案:CAP

本文地址:http://www.cnblogs.com/savorboard/p/cap.html

作者部落格:Savorboard

背景

相信前面幾篇關於微服務的文章也介紹了那麼多了,在構建微服務的過程中確實需要這麼一個東西,即便不是在構建微服務,那麼在構建分散式應用的過程中也會遇到分散式事務的問題,那麼 CAP 就是在這樣的背景下誕生的。

最初打算做這個東西是在去年(2016)年底,最初是為了解決分散式系統中的分散式事務的問題,然後當時有了一個大概的概念輪廓,當時我對於前面兩篇文章中關於非同步訊息和微服務之間通訊還不是太瞭解,只是覺得這樣能夠解決這一系列的問題,然後就著手做了,最後發現和這些概念竟然不謀而合。

經過大半年的不斷重構以及修改,最終 CAP 1.0 版本釋出了。作為一個開源專案,最初專案是在我的個人Github下,然後於上個月已經貢獻給了.NET China Foundation組織,目前該專案由我和 DotNetCore 專案組共同維護。

CAP 介紹

Github:https://github.com/dotnetcore/CAP

開源協議:MIT

CAP 是一個在分散式系統中(SOA,MicroService)實現事件匯流排及最終一致性(分散式事務)的一個開源的 C# 庫,她具有輕量級,高效能,易使用等特點。

你可以輕鬆的在基於 .NET Core 技術的分散式系統中引入CAP,包括但限於 ASP.NET Core 和 ASP.NET Core on .NET Framework。

CAP 以 NuGet 包的形式提供,對專案無任何入侵,你仍然可以以你喜愛的方式來構建分散式系統。

CAP 具有 Event Bus 的所有功能,並且CAP提供了更加簡化的方式來處理EventBus中的釋出/訂閱。

CAP 具有訊息持久化的功能,也就是當你的服務進行重啟或者宕機時,她可以保證訊息的可靠性。

CAP 實現了分散式事務中的最終一致性,你不用再去處理這些瑣碎的細節。

CAP 提供了基於 Microsoft DI 的 API 服務,她可以和你的 ASP.NET Core 系統進行無縫結合,並且能夠和你的業務程式碼整合支援強一致性的事務處理。

CAP 是開源免費的。CAP基於MIT協議開源,你可以免費的在你的私人或者商業專案中使用,不會有人向你收取任何費用。

Getting Started

目前, CAP 同時支援使用 RabbitMQ,Kafka,Azure Service Bus 等進行底層之間的訊息傳送,你不需要具備這些訊息佇列的使用經驗,仍然可以輕鬆的整合到專案中。

CAP 目前支援使用 Sql Server,MySql,PostgreSql,MongoDB 資料庫的專案。

CAP 同時支援使用 EntityFrameworkCore 和 ADO.NET 的專案,你可以根據需要選擇不同的配置方式。

下面是CAP在系統中的一個不完全示意圖:

圖中實線部分代表使用者程式碼,虛線部分代表CAP內部實現。

下面,我們看一下 CAP 怎麼整合到專案中:

Step 1:

你可以執行下面的命令來安裝CAP NuGet 包:

PM> Install-Package DotNetCore.CAP

根據底層訊息佇列,你可以選擇引入不同的包:

PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus

CAP 目前支援使用 SQL Server, PostgreSql, MySql, MongoDB 的專案,你可以選擇引入不同的包:

PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB     //需要 MongoDB 4.0+ 叢集

Step 2:

Startup.cs檔案中,新增如下配置:

public void ConfigureServices(IServiceCollection services)
{
    ......

    services.AddDbContext<AppDbContext>();

    services.AddCap(x =>
    {
        //如果你使用的 EF 進行資料操作,你需要新增如下配置:
        x.UseEntityFramework<AppDbContext>();  //可選項,你不需要再次配置 x.UseSqlServer 了
		
        //如果你使用的ADO.NET,根據資料庫選擇進行配置:
        x.UseSqlServer("資料庫連線字串");
        x.UseMySql("資料庫連線字串");
        x.UsePostgreSql("資料庫連線字串");

        //如果你使用的 MongoDB,你可以新增如下配置:
        x.UseMongoDB("ConnectionStrings");  //注意,僅支援MongoDB 4.0+叢集
	
        //CAP支援 RabbitMQ、Kafka、AzureServiceBus 等作為MQ,根據使用選擇配置:
        x.UseRabbitMQ("ConnectionStrings");
        x.UseKafka("ConnectionStrings");
        x.UseAzureServiceBus("ConnectionStrings");
    });
}

釋出事件/訊息

在 Controller 中注入ICapPublisher然後使用ICapPublisher進行訊息釋出:

public class PublishController : Controller
{
    private readonly ICapPublisher _capBus;

    public PublishController(ICapPublisher capPublisher)
    {
        _capBus = capPublisher;
    }
    
    //不使用事務
    [Route("~/without/transaction")]
    public IActionResult WithoutTransaction()
    {
        _capBus.Publish("xxx.services.show.time", DateTime.Now);
	
        return Ok();
    }

    //Ado.Net 中使用事務,自動提交
    [Route("~/adonet/transaction")]
    public IActionResult AdonetWithTransaction()
    {
        using (var connection = new MySqlConnection(ConnectionString))
        {
            using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))
            {
                //業務程式碼

                _capBus.Publish("xxx.services.show.time", DateTime.Now);
            }
        }
        return Ok();
    }

    //EntityFramework 中使用事務,自動提交
    [Route("~/ef/transaction")]
    public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
    {
        using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true))
        {
            //業務程式碼

            _capBus.Publish("xxx.services.show.time", DateTime.Now);
        }
        return Ok();
    }
}

訂閱事件/訊息

Controller中:
如果是在Controller中,直接新增[CapSubscribe("")]來訂閱相關訊息。

public class PublishController : Controller
{
    [NoAction]
    [CapSubscribe("xxx.services.show.time")]
    public async Task CheckReceivedMessage(DateTime time)
    {
    	Console.WriteLine(time);
    	return Task.CompletedTask;
    }
}

xxxService中:
如果你的方法沒有位於Controller 中,那麼你訂閱的類需要繼承ICapSubscribe,然後新增[CapSubscribe("")]標記:

namespace xxx.Service
{
    public interface ISubscriberService
    {
    	public void CheckReceivedMessage(DateTime time);
    }
    
    
    public class SubscriberService: ISubscriberService, ICapSubscribe
    {
    	[CapSubscribe("xxxx.services.show.time")]
    	public void CheckReceivedMessage(DateTime time)
    	{
    		
    	}
    }
}

然後在Startup.cs中的 ConfigureServices() 中注入你的ISubscriberService

public void ConfigureServices(IServiceCollection services)
{
    services.AddTransient<ISubscriberService,SubscriberService>();
}

結束了,怎麼樣,是不是很簡單?

鳴謝

感謝lan Ye同學對本專案的英文翻譯工作。

感謝AlexLEWIS同學對本專案的其他支援。

感謝.NET China Foundation團隊成員對本專案的支援。

總結

如果你有任何問題,都可以去 Github 給我們提交 Issue,我們會在第一時間處理。

如果你覺得這個開源專案還不錯,給個Github Star 支援一下那就太好了。

如果你覺得本篇文章對你有幫助的話,可以關注一下博主,順手點個【推薦】哦。