1. 程式人生 > >.NET Core微服務之開源專案CAP的初步使用

.NET Core微服務之開源專案CAP的初步使用

一、CAP簡介

  CAP 是一個在分散式系統中(SOA,MicroService)實現事件匯流排及最終一致性(分散式事務)的一個開源的 C# 庫,她具有輕量級,高效能,易使用等特點。我們可以輕鬆的在基於 .NET Core 技術的分散式系統中引入CAP,包括但限於 ASP.NET Core 和 ASP.NET Core on .NET Framework。

  CAP 的應用場景主要有以下兩個:

  • 分散式事務中的最終一致性(非同步確保)的方案
  • 具有高可用性的 EventBus

  CAP 同時支援使用 RabbitMQ 或 Kafka 進行底層之間的訊息傳送,我們不需要具備 RabbitMQ 或者 Kafka 的使用經驗,仍然可以輕鬆的將CAP整合到專案中。

  CAP 目前支援使用 Sql Server,MySql,PostgreSql 資料庫的專案;

  CAP 同時支援使用 EntityFrameworkCore 和 Dapper 的專案,可以根據需要選擇不同的配置方式;

  CAP的作者為園友savorboard(楊曉東),成都地區的.NET社群領導者,棒棒噠!

二、案例結構

  此次試驗仍然和上一篇基於MassTransit的案例一樣(其實是我懶得再改,直接拿來複用),共有四個MicroService應用程式,當用戶下訂單時會通過CAP作為事件匯流排釋出訊息,作為訂閱者的庫存和配送服務會接收到訊息並消費訊息。此次試驗會採用RabbitMQ作為訊息佇列,採用MSSQL作為關係型資料庫(同時CAP也是支援MSSQL的)。

  準備工作:為所有服務通過NuGet安裝CAP及其相關包

PM> Install-Package DotNetCore.CAP

 下面是RabbitMQ的支援包

PM> Install-Package DotNetCore.CAP.RabbitMQ

 下面是MSSQL的支援包

PM> Install-Package DotNetCore.CAP.SqlServer

三、具體實現

3.1 OrderService

  (1)啟動配置:這裡主要需要給CAP指定資料庫(它會在這個資料庫中建立本地訊息表Published和Received)以及使用到的訊息佇列(這裡是RabbitMQ)

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddMvc();

        // Repository
        services.AddScoped<IOrderRepository, OrderRepository>();

        // EF DbContext
        services.AddDbContext<OrderDbContext>();

        // Dapper-ConnString
        services.AddSingleton(Configuration["DB:OrderDB"]);

        // CAP
        services.AddCap(x =>
        {
            x.UseEntityFramework<OrderDbContext>(); // EF

            x.UseSqlServer(Configuration["DB:OrderDB"]); // SQL Server

            x.UseRabbitMQ(cfg =>
            {
                cfg.HostName = Configuration["MQ:Host"];
                cfg.VirtualHost = Configuration["MQ:VirtualHost"];
                cfg.Port = Convert.ToInt32(Configuration["MQ:Port"]);
                cfg.UserName = Configuration["MQ:UserName"];
                cfg.Password = Configuration["MQ:Password"]; 
            }); // RabbitMQ

            // Below settings is just for demo
            x.FailedRetryCount = 2;
            x.FailedRetryInterval = 5;
        });

        ......
    }

    // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ......

        app.UseMvc();

        // CAP
        app.UseCap();

        ......
    }

  (2)Controller:這裡會呼叫Repository去實現業務邏輯和傳送訊息

    [Route("api/Order")]
    public class OrderController : Controller
    {
        public IOrderRepository OrderRepository { get; }

        public OrderController(IOrderRepository OrderRepository)
        {
            this.OrderRepository = OrderRepository;
        }

        [HttpPost]
        public string Post([FromBody]OrderDTO orderDTO)
        {
            var result = OrderRepository.CreateOrderByDapper(orderDTO).GetAwaiter().GetResult();

            return result ? "Post Order Success" : "Post Order Failed";
        }
    }

  (3)Repository:這裡實現了兩種方式:EF和Dapper(基於ADO.NET),其中EF方式中不需要傳transaction(當CAP檢測到 Publish 是在EF事務區域內的時候,將使用當前的事務上下文進行訊息的儲存),而基於ADO.NET方式中需要傳transaction(由於不能獲取到事務上下文,所以需要使用者手動的傳遞事務上下文到CAP中)。

    public class OrderRepository : IOrderRepository
    {
        public OrderDbContext DbContext { get; }
        public ICapPublisher CapPublisher { get; }
        public string ConnStr { get; } // For Dapper use

        public OrderRepository(OrderDbContext DbContext, ICapPublisher CapPublisher, string ConnStr)
        {
            this.DbContext = DbContext;
            this.CapPublisher = CapPublisher;
            this.ConnStr = ConnStr;
        }

        public async Task<bool> CreateOrderByEF(IOrder order)
        {
            using (var trans = DbContext.Database.BeginTransaction())
            {
                var orderEntity = new Order()
                {
                    ID = GenerateOrderID(),
                    OrderUserID = order.OrderUserID,
                    OrderTime = order.OrderTime,
                    OrderItems = null,
                    ProductID = order.ProductID // For demo use
                };

                DbContext.Orders.Add(orderEntity);
                await DbContext.SaveChangesAsync();

                // When using EF, no need to pass transaction
                var orderMessage = new OrderMessage()
                {
                    ID = orderEntity.ID,
                    OrderUserID = orderEntity.OrderUserID,
                    OrderTime = orderEntity.OrderTime,
                    OrderItems = null,
                    ProductID = orderEntity.ProductID // For demo use
                };
                
                await CapPublisher.PublishAsync(EventConstants.EVENT_NAME_CREATE_ORDER, orderMessage);

                trans.Commit();
            }

            return true;
        }

        public async Task<bool> CreateOrderByDapper(IOrder order)
        {
            using (var conn = new SqlConnection(ConnStr))
            {
                conn.Open();
                using (var trans = conn.BeginTransaction())
                {
                    // business code here
                    string sqlCommand = @"INSERT INTO [dbo].[Orders](OrderID, OrderTime, OrderUserID, ProductID)
                                                                VALUES(@OrderID, @OrderTime, @OrderUserID, @ProductID)";

                    order.ID = GenerateOrderID();
                    await conn.ExecuteAsync(sqlCommand, param: new
                    {
                        OrderID = order.ID,
                        OrderTime = DateTime.Now,
                        OrderUserID = order.OrderUserID,
                        ProductID = order.ProductID
                    }, transaction: trans);

                    // For Dapper/ADO.NET, need to pass transaction
                    var orderMessage = new OrderMessage()
                    {
                        ID = order.ID,
                        OrderUserID = order.OrderUserID,
                        OrderTime = order.OrderTime,
                        OrderItems = null,
                        MessageTime = DateTime.Now,
                        ProductID = order.ProductID // For demo use
                    };

                    await CapPublisher.PublishAsync(EventConstants.EVENT_NAME_CREATE_ORDER, orderMessage, trans);

                    trans.Commit();
                }
            }

            return true;
        }

        private string GenerateOrderID()
        {
            // TODO: Some business logic to generate Order ID
            return Guid.NewGuid().ToString();
        }

        private string GenerateEventID()
        {
            // TODO: Some business logic to generate Order ID
            return Guid.NewGuid().ToString();
        }
    }

  這裡摘抄一段CAP wiki中關於事務的一段介紹:

  事務在 CAP 具有重要作用,它是保證訊息可靠性的一個基石。 在傳送一條訊息到訊息佇列的過程中,如果不使用事務,我們是沒有辦法保證我們的業務程式碼在執行成功後訊息已經成功的傳送到了訊息佇列,或者是訊息成功的傳送到了訊息佇列,但是業務程式碼確執行失敗。

  這裡的失敗原因可能是多種多樣的,比如連線異常,網路故障等等。

  只有業務程式碼和CAP的Publish程式碼必須在同一個事務中,才能夠保證業務程式碼和訊息程式碼同時成功或者失敗

換句話說,CAP會確保我們這段邏輯中業務程式碼和訊息程式碼都成功了,才會真正讓事務commit。

3.2 StorageService

  (1)啟動配置:這裡主要是指定Subscriber

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddMvc();

        // EF DbContext
        services.AddDbContext<StorageDbContext>();

        // Dapper-ConnString
        services.AddSingleton(Configuration["DB:StorageDB"]);

        // Subscriber
        services.AddTransient<IOrderSubscriberService, OrderSubscriberService>();

        // CAP
        services.AddCap(x =>
        {
            x.UseEntityFramework<StorageDbContext>(); // EF

            x.UseSqlServer(Configuration["DB:StorageDB"]); // SQL Server

            x.UseRabbitMQ(cfg =>
            {
                cfg.HostName = Configuration["MQ:Host"];
                cfg.VirtualHost = Configuration["MQ:VirtualHost"];
                cfg.Port = Convert.ToInt32(Configuration["MQ:Port"]);
                cfg.UserName = Configuration["MQ:UserName"];
                cfg.Password = Configuration["MQ:Password"];
            }); // RabbitMQ

            // Below settings is just for demo
            x.FailedRetryCount = 2;
            x.FailedRetryInterval = 5;
        });

        ......
    }

    // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public void Configure(IApplicationBuilder app, IServiceProvider serviceProvider, IHostingEnvironment env, IApplicationLifetime lifetime)
    {
        ......

        app.UseMvc();

        // CAP
        app.UseCap();

        ......
    }

  (2)實現Subscriber

  首先定義一個介面,建議放到公共類庫中

    public interface IOrderSubscriberService
    {
        Task ConsumeOrderMessage(OrderMessage message);
    }

  然後實現這個介面,記得讓其實現ICapSubscribe介面,然後我們就可以使用 CapSubscribeAttribute 來訂閱 CAP 釋出出來的訊息。

    public class OrderSubscriberService : IOrderSubscriberService, ICapSubscribe
    {
        private readonly string _connStr;
        
        public OrderSubscriberService(string connStr)
        {
            _connStr = connStr;
        }

        [CapSubscribe(EventConstants.EVENT_NAME_CREATE_ORDER)]
        public async Task ConsumeOrderMessage(OrderMessage message)
        {
            await Console.Out.WriteLineAsync($"[StorageService] Received message : {JsonHelper.SerializeObject(message)}");
            await UpdateStorageNumberAsync(message);
        }

        private async Task<bool> UpdateStorageNumberAsync(OrderMessage order)
        {
            //throw new Exception("test"); // just for demo use
            using (var conn = new SqlConnection(_connStr))
            {
                string sqlCommand = @"UPDATE [dbo].[Storages] SET StorageNumber = StorageNumber - 1
                                                                WHERE StorageID = @ProductID";

                int count = await conn.ExecuteAsync(sqlCommand, param: new
                {
                    ProductID = order.ProductID
                });

                return count > 0;
            }
        }
    }

  *.CAP約定訊息端在方法實現的過程中需要實現冪等性,所謂冪等性就是指使用者對於同一操作發起的一次請求或者多次請求的結果是一致的,不會因為多次點選而產生了副作用。這裡我沒有考慮,實際中需要首先進行驗證,避免二次更新

3.3 DeliveryService

  (1)啟動配置:與StorageService高度類似,只是使用的不是同一個資料庫

  (2)實現Subscriber

    public class OrderSubscriberService : IOrderSubscriberService, ICapSubscribe
    {
        private readonly string _connStr;

        public OrderSubscriberService(string connStr)
        {
            _connStr = connStr;
        }

        [CapSubscribe(EventConstants.EVENT_NAME_CREATE_ORDER)]
        public async Task ConsumeOrderMessage(OrderMessage message)
        {
            await Console.Out.WriteLineAsync($"[DeliveryService] Received message : {JsonHelper.SerializeObject(message)}");
            await AddDeliveryRecordAsync(message);
        }

        private async Task<bool> AddDeliveryRecordAsync(OrderMessage order)
        {
            //throw new Exception("test"); // just for demo use
            using (var conn = new SqlConnection(_connStr))
            {
                string sqlCommand = @"INSERT INTO [dbo].[Deliveries] (DeliveryID, OrderID, ProductID, OrderUserID, CreatedTime)
                                                            VALUES (@DeliveryID, @OrderID, @ProductID, @OrderUserID, @CreatedTime)";

                int count = await conn.ExecuteAsync(sqlCommand, param: new
                {
                    DeliveryID = Guid.NewGuid().ToString(),
                    OrderID = order.ID,
                    OrderUserID = order.OrderUserID,
                    ProductID = order.ProductID,
                    CreatedTime = DateTime.Now
                });

                return count > 0;
            }
        }
    }

3.4 快速測試

  (1)啟動3個微服務,Check 資料庫表狀態

  

  首先會看到在各個資料庫中均建立了本地訊息表,這兩個表的含義如下:

  Cap.Published:這個表主要是用來儲存 CAP 傳送到MQ(Message Queue)的客戶端訊息,也就是說你使用 ICapPublisher 介面 Publish 的訊息內容。

  Cap.Received:這個表主要是用來儲存 CAP 接收到 MQ(Message Queue) 的客戶端訂閱的訊息,也就是使用 CapSubscribe[] 訂閱的那些訊息。

  

  然後看看各個表的資料,目前只有庫存表有資料,因為我們要做的只是更新。

  (2)通過Postman發一個Post請求

  

  (3)Check控制檯輸出的日誌資訊

  

  

  (4)Check資料庫中的業務表和訊息表資料:可以看到傳送者和接收者都執行成功了,如果其中任何一個參與者發生了異常或者連線不上,CAP會有預設的重試機制(預設是50次最大重試次數,每次重試間隔60s),當失敗總次數達到預設失敗總次數後,就不會進行重試了,我們可以在 Dashboard 中檢視訊息失敗的原因,然後進行人工重試處理。

  

  

  另外,由於CAP會在資料庫中建立訊息表,因此難免會考慮到其效能。CAP提供了一個數據清理的機制,預設情況下會每隔一個小時將訊息表的資料進行清理刪除,避免資料量過多導致效能的降低。清理規則為 ExpiresAt (欄位名)不為空並且小於當前時間的資料。

四、小結

  本篇首先簡單介紹了一下CAP這個開源專案,然後基於上一篇中的下訂單的小案例來進行了基於CAP的改造,並通過一個例項的執行來看到了結果。當然,這個例項並不完美,很多點都沒有考慮(比如訊息端消費時的冪等性)和失敗重試的場景實踐等等等等。由於時間和精力的關係,目前只使用到這兒,以後有機會能夠應用上會研究下CAP的原始碼,最後感謝楊曉東為.NET社群帶來了一個優秀的開源專案!

示例程式碼

  Click Here => 點我點我

參考資料

作者:周旭龍

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段宣告,且在文章頁面明顯位置給出原文連結。

相關推薦

.NET Core服務開源專案CAP初步使用

一、CAP簡介   CAP 是一個在分散式系統中(SOA,MicroService)實現事件匯流排及最終一致性(分散式事務)的一個開源的 C# 庫,她具有輕量級,高效能,易使用等特點。我們可以輕鬆的在基於 .NET Core 技術的分散式系統中引入CAP,包括但限於 ASP.NET Core 和

.NET Core服務基於Consul實現服務治理

請求轉發 1.0 asp.net AC port prefix 我們 tle nan 一、Consul基礎介紹   Consul是HashiCorp公司推出的開源工具,用於實現分布式系統的服務發現與配置。與其他分布式服務註冊與發現的方案,比如 Airbnb的Smart

.NET Core服務基於Consul實現服務治理(續)

shell pla code tst 分層 編輯 set req \n 上一篇發布之後,這一篇把上一篇沒有弄到的東西補一下,也算是給各位前來詢問的朋友的一些回復吧。一、Consul服務註冊之配置文件方式1.1 重溫Consul實驗集群  這裏我們有三個Consul Serv

.net core 服務Api閘道器(Api Gateway)

微服務閘道器目錄 1、 微服務引子 2、使用Nginx作為api閘道器 3、自創api閘道器(重複輪子) 3.1、構建初始化 3.2、構建中介軟體 4、結語

基於Apollo實現.NET Core服務統一配置(測試環境-單機) .NET Core服務基於Apollo實現統一配置中心

一、前言 注:此篇只是為測試環境下的快速入門。後續會給大家帶來生產環境下得實戰開發。 具體的大家可以去看官方推薦。非常的簡單明瞭。以下介紹引用官方內容: Apollo(阿波羅)是攜程框架部門研發的分散式配置中心,能夠集中化管理應用不同環境、不同叢集的配置,配置修改後能夠實時推送到應用端,並且具

.NET Core服務基於Steeltoe使用Eureka實現服務註冊與發現

一、關於Steeltoe與Spring Cloud    Steeltoe is an open source project that enables .NET developers to implement industry standard best practices when b

.NET Core服務基於Steeltoe整合Zuul實現統一API閘道器

一、關於Spring Cloud Zuul   API Gateway(API GW / API 閘道器),顧名思義,是出現在系統邊界上的一個面向API的、序列集中式的強管控服務,這裡的邊界是企業IT系統的邊界。   Zuul 是Netflix 提供的一個開源元件,致力於在雲平臺上提供動態路由,監

.NET Core服務基於Steeltoe使用Hystrix熔斷保護與監控

一、關於Spring Cloud Hystrix      在微服務架構中,我們將系統拆分為很多個服務,各個服務之間通過註冊與訂閱的方式相互依賴,由於各個服務都是在各自的程序中執行,就有可能由於網路原因或者服務自身的問題導致呼叫故障或延遲,隨著服務的積壓,可能會導致服務崩潰。為了解決這一系列的問題

.NET Core服務基於Steeltoe使用Spring Cloud Config統一管理配置

一、關於Spring Cloud Config   在分散式系統中,每一個功能模組都能拆分成一個獨立的服務,一次請求的完成,可能會呼叫很多個服務協調來完成,為了方便服務配置檔案統一管理,更易於部署、維護,所以就需要分散式配置中心元件了,在Spring Cloud中,就有這麼一個分散式配置中心元件 —

NET Core服務路:自己動手實現Rpc服務框架,基於DotEasy.Rpc服務框架的介紹和整合

本篇內容屬於非實用性(拿來即用)介紹,如對框架設計沒興趣的朋友,請略過。   快一個月沒有寫博文了,最近忙著兩件事;    一:閱讀劉墉先生的《說話的魅力》,以一種微妙的,你我大家都會經常遇見的事物,來建議說話的“藝術和魅力”,對於我們從事軟體開發、不太善於溝通

.NET Core服務路:利用DotNetty實現一個簡單的通訊過程

  上一篇我們已經全面的介紹過《基於gRPC服務發現與服務治理的方案》,我們先複習一下RPC的呼叫過程(筆者會在這一節的幾篇文章中反覆的強調這個過程呼叫方案),看下圖

NET Core服務路:自己動手實現Rpc服務框架,基於DotEasy.Rpc服務框架的介紹和整合...

本篇內容屬於非實用性(拿來即用)介紹,如對框架設計沒興趣的朋友,請略過。  快一個月沒有寫博文了,最近忙著兩件事;    一:閱讀劉墉先生的《說話的魅力》,以一種微妙的,你我大家都會經常遇見的事物,來建議說話的“藝術和魅力”,對於我們從事軟體開發、不太善

.NET Core服務路:讓我們對上一個Demo通訊進行修改,完成RPC通訊

 最近一段時間有些事情耽擱了更新,抱歉各位了。   上一篇我們簡單的介紹了DotNetty通訊框架,並簡單的介紹了基於DotNetty實現了迴路(Echo)通訊過程。   我們來回憶一下上一個專案的整個流程: 當服務端啟動後,繫結並監聽(READ)設定的埠,比如1889。

.NET Core服務基於Steeltoe使用Zipkin實現分散式追蹤

一、關於Spring Cloud Sleuth與Zipkin   在 SpringCloud 之中提供的 Sleuth 技術可以實現微服務的呼叫跟蹤,也就是說它可以自動的形成一個呼叫連線線,通過這個連線線使得開發者可以輕鬆的找到所有微服務間關係,同時也可以獲取微服務所耗費的時間, 這樣就可以進行微服

.NET Core服務基於Jenkins+Docker實現持續部署(Part 1)

一、CI, CD 與Jenkins   網際網路軟體的開發和釋出,已經形成了一套標準流程,最重要的組成部分就是持續整合(Continuous integration,簡稱 CI) => 持續整合指的是,頻繁地(一天多次)將程式碼整合到主幹。   它的好處主要有兩個: 快速發現錯

.NET Core服務基於App.Metrics+InfluxDB+Grafana實現統一效能監控

一、關於App.Metrics+InfluxDB+Grafana 1.1 App.Metrics      App.Metrics是一款開源的支援.NET Core的監控外掛,它還可以支援跑在.NET Framework上的應用程式(版本 >= 4.5.2)。官方文件地址:https://ww

.NET Core服務基於Apollo實現統一配置中心

一、關於統一配置中心與Apollo   在微服務架構環境中,專案中配置檔案比較繁雜,而且不同環境的不同配置修改相對頻繁,每次釋出都需要對應修改配置,如果配置出現錯誤,需要重新打包釋出,時間成本較高,因此需要做統一的配置中心,能做到自動更新配置檔案資訊,解決以上問題。   Apollo(阿波羅)是攜

.NET Core服務基於Ocelot實現API閘道器服務

一、啥是API閘道器?   API 閘道器一般放到微服務的最前端,並且要讓API 閘道器變成由應用所發起的每個請求的入口。這樣就可以明顯的簡化客戶端實現和微服務應用程式之間的溝通方式。以前的話,客戶端不得不去請求微服務A(假設為Customers),然後再到微服務B(假設為Orders),然後是微服

.NET Core服務基於Exceptionless實現分散式日誌記錄

一、Exceptionless極簡介紹   Exceptionless 是一個開源的實時的日誌收集框架,它可以應用在基於 ASP.NET,ASP.NET Core,Web API,Web Forms,WPF,Console,ASP.NET MVC 等技術開發的應用程式中,並且提供了REST介面可以應

.NET Core服務ASP.NET Core on Docker

一、Docker極簡介紹 1.1 總體介紹   Docker 是一個開源的應用容器引擎,基於 Go 語言 並遵從Apache2.0協議開源。Docker 可以讓開發者打包他們的應用以及依賴包到一個輕量級、可移植的容器中,然後釋出到任何流行的 Linux 機器上,也可以實現虛擬化。容器是完全使用沙箱