.net 微服務實踐
l 前言
本文記錄了我的一次.net core 微服務架構實踐經驗,以及所用到的技術
l 優點
- 每個服務聚焦於一塊業務,無論在開發階段或是部署階段都是獨立的,更適合被各個小團隊開發維護,團隊對服務的整個生命週期負責,工作在獨立的上下文之中。
-
如果某一項服務的效能達到瓶頸,我們只需要增加該服務負載節點,能夠針對系統的瓶頸服務更有效的使用資源。
- 服務A可以使用.net實現 ,服務B可以使用java實現,技術選型靈活,系統不會長期限制在某個技術棧上。
- 鬆耦合、高內聚,程式碼容易理解,開發效率高,更好維護。
- 高可用,每個服務可以啟動多個例項負載,單個例項掛了有足夠的響應時間來修復
l 缺點
- 系統規模龐大,運維要求高,需要devops技巧(Jenkins,Kubernetes等等)
- 跨服務需求需要團隊之間的協作
- 跨服務的呼叫(http/rpc)增加了系統的延遲
l Docker
docker是目前普遍使用的容器化技術,在此架構中我們的應用程式將部署在docker容器裡面,通過docker釋出應用 需要先編寫一個dockerfile,如下
#引入映象 .net core 3.1 FROM mcr.microsoft.com/dotnet/core/aspnet:3.1-buster-slim AS base #設定工作目錄 WORKDIR /app #在容器中程式使用的埠,一定要和程式啟動使用的埠對應上 EXPOSE 80 #複製檔案到工作目錄 COPY . . #環境變數 此變數會覆蓋appsetting.json 內的同名變數 ENV Ip "" ENV Port "" #啟動程式 ENTRYPOINT ["dotnet", "Union.UserCenter.dll"]
docker build 命令 將我們的釋出目錄打包一個docker映象,例如 docker build -t test . ,test是映象名稱
docker run 命令啟動我們打包的映象,例如 docker run -d -p 5002:80 --name="test1" -e Ip="192.168.0.164" -e Port="5002" test ,-e 表示傳遞環境變數
更多docker命令 請查閱:https://www.runoob.com/docker/docker-command-manual.html
docker官網:https://www.docker.com
- 部署方便:只需要一個簡單的 docker run命令,就可以啟動一個應用例項了
- 部署安全:打包映象的時候已經打包了應用所需環境,執行環境不會出現任何問題
- 隔離性好:同一臺機器我可以部署java的應用和.net的應用,互不影響
- 快速回滾:只要映象存在可以快速回滾到任一版本
- 成本低:一臺機器可以執行很多例項,很容易就可以實現高可用和橫向擴充套件
經測試docker for windows不適合部署生產環境,還是得在liunx系統上跑, .net framework 無法在docker上部署
Docker compose :Docker官方提供的管理工具,可以簡單的配置一組容器啟動引數、啟動順序、依賴關係
Kubernetes :容器數量很多之後會變得難以管理,可以引入Kubernetes對容器進行自動管理,熟練運用有一定難度,中文社群:https://www.kubernetes.org.cn/k8s
l RPC 遠端過程呼叫
為什麼要有RPC
按照微服務設計思想,服務A只專注於服務A的業務,但是需求上肯定會有服務A需要呼叫服務B來完成一個業務處理的情況,使用http呼叫其他服務效率相對較低,所以引入了RPC。
gRPC vs thrift 評測:https://www.cnblogs.com/softidea/p/7232035.html
這裡使用thrift,thrift 官網:http://thrift.apache.org
Thrift 採用IDL(Interface Definition Language)來定義通用的服務介面,然後通過Thrift提供的編譯器,可以將服務介面編譯成不同語言編寫的程式碼,通過這個方式來實現跨語言的功能,語法請自行百度
下載thrift 程式碼生成器 http://thrift.apache.org/download ,thrift-0.13.0.exe 這個檔案
執行命令 thrift.exe --gen netcore xxxxxxx.thrift ,生成C# 服務介面程式碼
引用官方提供的.net 庫,可以去官網下載,找不到的可以直接 nuget引用 Examda.Thrift,這是我為了方便使用上傳的
新增生成的程式碼到我們的服務端裡,然後自己實現 thrift檔案定義的介面
using System.Threading; using System.Threading.Tasks; using Union.UnionInfo.Service.Interface; using static Examda.Contract.UnionInfo.UnionInfoService; namespace Union.UnionInfo.Service { public class UnionInfoServiceImpl : IAsync { private readonly ILmMembersInfoService _lmMembersInfoService; public UnionInfoServiceImpl(ILmMembersInfoService lmMembersInfoService) { _lmMembersInfoService = lmMembersInfoService; } //實現介面 public async Task<string> GetUnionIdAsync(string DozDomain, CancellationToken cancellationToken) { return (await _lmMembersInfoService.GetMembersInfoByDozDomain(DozDomain)).UnionId; } } }
新增一個類繼承 IHostedService
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Threading; using System.Threading.Tasks; using Thrift; using Thrift.Protocols; using Thrift.Server; using Thrift.Transports; using Thrift.Transports.Server; namespace Examda.Core.Rpc { public class RpcServiceHost : IHostedService { public IConfiguration Configuration { get; } public ITAsyncProcessor Processor { get; } public ILoggerFactory LoggerFactory { get; } public RpcServiceHost(IConfiguration configuration, ITAsyncProcessor processor,ILoggerFactory loggerFactory) { Configuration = configuration; Processor = processor; LoggerFactory = loggerFactory; } // public virtual Task StartAsync(CancellationToken cancellationToken) { TServerTransport serverTransport = new TServerSocketTransport(Configuration.GetValue<int>("RpcPort")); TBinaryProtocol.Factory factory1 = new TBinaryProtocol.Factory(); TBinaryProtocol.Factory factory2 = new TBinaryProtocol.Factory(); //UnionInfoService.AsyncProcessor processor = new AsyncProcessor(new UnionInfoServiceImpl());實現的服務這裡採用.net core 自帶 DI注入,也可以直接例項化 TBaseServer server = new AsyncBaseServer(Processor, serverTransport, factory1, factory2, LoggerFactory); return server.ServeAsync(cancellationToken); } public virtual Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } } }
修改ConfigureServices新增如下程式碼
//注入rpc服務實現例項 services.AddSingleton<ITAsyncProcessor>(provider => { var lmMembersInfoService = provider.GetService<ILmMembersInfoService>(); return new AsyncProcessor(new UnionInfoServiceImpl(lmMembersInfoService)); }); //監聽rpc埠 services.AddHostedService<RpcServiceHost>();
服務端就完成了,接下來編寫客戶端呼叫,修改客戶端ConfigureServices新增如下程式碼
//test rpc服務 services.AddScoped(provider => { var examdaConsul = provider.GetService<ExamdaConsul>(); Address address = examdaConsul.GetAddress("UnionInfo");//獲取服務地址,這裡我封裝了,測試可以先直接寫死 var tClientTransport = new TSocketClientTransport(IPAddress.Parse(address.Ip), address.Port); var tProtocol = new TBinaryProtocol(tClientTransport); return new UnionInfoService.Client(tProtocol); });
控制器內呼叫示例
using System.Threading; using System.Threading.Tasks; using Examda.Contract.UnionInfo; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Logging; namespace RPCCLIENT.Controllers { [ApiController] [Route("[controller]")] public class WeatherForecastController : ControllerBase { private static readonly string[] Summaries = new[] { "Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching" }; private readonly UnionInfoService.Client _rpcClient; private readonly ILogger<WeatherForecastController> _logger; public WeatherForecastController(ILogger<WeatherForecastController> logger, UnionInfoService.Client rpcClient) { _logger = logger; _rpcClient = rpcClient; } [HttpGet] public async Task<IActionResult> Get() { await _rpcClient.OpenTransportAsync(CancellationToken.None); var order = await _rpcClient.GetUnionIdAsync("wx.hdgk.cn", CancellationToken.None);//rpc呼叫 return Ok(order); } } }
l 服務註冊與發現
為什麼要有服務註冊與發現
例如:服務A一開始只有一個例項,此時又啟動了一個服務A的例項,但是呼叫服務A的服務B並不知道 服務A多了一個例項(或者少了),此時引入服務註冊與發現可以讓服務B得知服務A的變更情況,服務B就知道自己要呼叫的服務IP:埠 是多少,不需要人工干預
常見的註冊中心
這裡使用consul
健康檢查:consul自帶健康檢查,檢查服務是否可用,不可用的服務將從註冊中心剔除,自帶的就是隔一段時間檢測一下埠通不通,並且支援自行擴充套件健康檢查,可用自己在服務內實現是否健康的邏輯,比如雖然介面是通的,但是我發現自己宿主機cpu過80%了,就返回不健康的狀態
服務註冊:nuget安裝consul,寫一個擴充套件方法
/// <summary> /// 如果服務同時包含http,rpc呼叫此方法 /// </summary> /// <param name="services"></param> /// <param name="Configuration"></param> /// <param name="ServiceName"></param> /// <param name="Remark"></param> public static void AddExamdaServiceRpc(this IServiceCollection services, IConfiguration Configuration, string ServiceName, string Remark) { var Ip = Configuration.GetValue<string>("Ip"); var RpcPort = Configuration.GetValue<int>("RpcPort"); var RpcAddress = $"{Ip}:{RpcPort}"; var consulClient = new ConsulClient(x => x.Address = new Uri(Configuration.GetValue<string>("ConsulUrl")));//請求註冊的 Consul 地址 var httpCheck = new AgentServiceCheck() { DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(5),//服務啟動多久後註冊 Interval = TimeSpan.FromSeconds(20),//健康檢查時間間隔,或者稱為心跳間隔 Timeout = TimeSpan.FromSeconds(5), TCP = RpcAddress }; var registration = new AgentServiceRegistration() { Checks = new[] { httpCheck }, ID = RpcAddress, Name = ServiceName, Address = Ip, Port = RpcPort, Tags = new[] { Remark } }; consulClient.Agent.ServiceRegister(registration).Wait(); //應用程式退出時 AppDomain.CurrentDomain.ProcessExit += (sender, e) => { consulClient.Agent.ServiceDeregister(registration.ID).Wait();//consul取消註冊服務 }; }
修改ConfigureServices新增如下程式碼,啟動
services.AddExamdaServiceRpc(Configuration, "UnionInfo", "聯盟機構資訊服務");
安裝consul請自行百度
服務發現與變更:呼叫方配置好自己需要呼叫的服務名稱集合,然後去consul獲取地址列表,然後根據需要呼叫的服務數量啟動N個執行緒來輪詢服務最新的地址資訊,不用擔心輪詢造成的消耗過大,因為consul提供了Blocking Queries 阻塞查詢的方式,請求傳送到consul之後會在consul阻塞(30)秒,期間有變更或者到達30秒了之後才會返回地址列表,然後每一次變更之後的地址列表都會有一個新的版本號。
using Consul; using Microsoft.Extensions.Configuration; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Examda.Core.Consul { public class Address { public string Ip { get; set; } public int Port { get; set; } } /// <summary> /// 未實現服務負載均衡,這裡隨機選一個 /// </summary> public class ExamdaConsul { private object locker = new object(); private readonly ConsulClient _consulClient; private IDictionary<string, List<Address>> RpcServices { get; set; } public ExamdaConsul(IConfiguration configuration) { RpcServices = new Dictionary<string, List<Address>>(); _consulClient = new ConsulClient(c => { c.Address = new Uri(configuration.GetValue<string>("ConsulUrl")); }); foreach (var item in configuration.GetSection("RpcServiceClient").GetChildren().Select(x => x.Value).ToList())//遍歷所需要呼叫的服務名稱集合 { RpcServices.Add(item, null); var res = _consulClient.Catalog.Service(item).Result; RpcServices[item] = res.Response.Select(x => new Address() { Ip = x.ServiceAddress, Port = x.ServicePort }).ToList(); Task.Factory.StartNew(() => { var queryOptions = new QueryOptions { WaitTime = TimeSpan.FromSeconds(30) };//阻塞時間 queryOptions.WaitIndex = res.LastIndex; while (true) { GetAgentServices(queryOptions, item); } }); } } private void GetAgentServices(QueryOptions queryOptions, string serviceName) { var res = _consulClient.Catalog.Service(serviceName, null, queryOptions).Result; if (queryOptions.WaitIndex != res.LastIndex) { lock (locker) { queryOptions.WaitIndex = res.LastIndex; var currentServices = RpcServices[serviceName]; RpcServices[serviceName] = res.Response.Select(x => new Address() { Ip = x.ServiceAddress, Port = x.ServicePort }).ToList(); } } } /// <summary> /// 獲取服務可用地址 /// </summary> /// <param name="serviceName"></param> /// <returns></returns> public Address GetAddress(string serviceName) { for (int i = 0; i < 3; i++) { Random r = new Random(); int index = r.Next(RpcServices.Count); try { return RpcServices[serviceName][index]; } catch { Thread.Sleep(10); continue; } } return null; } } }
然後注入一個ExamdaConsul類的單例,講寫死的服務地址改成從consul獲取
//注入consul客戶端 單例 services.AddSingleton<ExamdaConsul>(); //注入UnionInfo rpc客戶端 執行緒單例 services.AddScoped(provider => { var examdaConsul = provider.GetService<ExamdaConsul>(); Address address = examdaConsul.GetAddress("UnionInfo");//從consul獲取服務地址 var tClientTransport = new TSocketClientTransport(IPAddress.Parse(address.Ip), address.Port); var tProtocol = new TBinaryProtocol(tClientTransport); return new UnionInfoService.Client(tProtocol); });
consul 官網:https://www.consul.io
l API閘道器
所有的請求都先經過閘道器,由轉發到對應的服務,對比了 ocelot 和 Bumblebee 兩個c#寫的閘道器。選擇使用了Bumblebee。
Ocelot效能比較低,吞吐比直接訪問降低四倍,但是文件很全面,功能整合很多,不需要自己擴充套件什麼。
Bumblebee 我做測試發現Bumblebee 效能很優秀,尷尬的是這個幾乎沒什麼人用,很多功能需要自己擴充套件,作者官網http://beetlex.io/ Bumblebee 文件:http://doc.beetlex.io/#29322e3796694434894fc2e6e8747626
這裡使用Bumblebee ,使用方法可以看作者的文件
健康檢查:不健康的節點將不會被轉發請求
限流:例如限制某個節點最多300rps,如果此節點併發了1000個請求,大概會有700個左右請求閘道器會直接返回錯誤,不會轉發到具體的服務,可以起到擋洪作用,避免節點直接掛了。
路由:我是這麼設定的 例如 http://192.168.0.164/Course/Tool/GetUserInfo ,Course一級是服務名稱 tool 是服務的控制器名稱 getuserinfo是方法名稱
負載均衡:服務多個節點負載,閘道器可以設定負載均衡策略
註冊到閘道器:redis釋出訂閱實現,新增一個擴充套件方法
public static void AddExamdaService(this IServiceCollection services, IConfiguration Configuration, string ServiceName, string Remark) { var Ip = Configuration.GetValue<string>("Ip"); var Port = Configuration.GetValue<int>("Port"); var Address = $"http://{Ip}:{Port}"; services.AddSingleton(new Redis(Configuration.GetValue<string>("Redis"))); ServiceProvider serviceProvider = services.BuildServiceProvider(); Redis redis = serviceProvider.GetService<Redis>(); redis.Publish("ApiGetewap", JsonConvert.SerializeObject(new { Address, ServiceName, Remark })); AppDomain.CurrentDomain.ProcessExit += (sender, e) => { redis.Publish("ApiGetewapExit", JsonConvert.SerializeObject(new { Address, ServiceName, Remark })); }; }
閘道器訂閱這個頻道
g = new OverrideApiGetewap(); g.HttpOptions(o => { o.Port = 80; o.LogToConsole = true; o.LogLevel = BeetleX.EventArgs.LogType.Error; }); g.Open(); var sub = Program.redis.GetSubscriber(); //註冊服務 sub.Subscribe("ApiGetewap",(chanel,message)=> { var service = JsonConvert.DeserializeObject<Service>(message); var route = g.Routes.NewOrGet(string.Format("^/{0}.*", service.ServiceName), service.Remark); route.AddServer(service.Address, 0); }); //服務退出 sub.Subscribe("ApiGetewapExit", (chanel, message) => { var service = JsonConvert.DeserializeObject<Service>(message); var route = g.Routes.NewOrGet(string.Format("^/{0}.*", service.ServiceName), service.Remark); route.RemoveServer(service.Address); });
修改ConfigureServices新增如下程式碼,啟動。這樣閘道器也能動態的發現我們的服務了
//註冊此服務到閘道器 services.AddExamdaService(Configuration, "Course", "聯盟我的課程服務");
異常流量拉黑:例如某個ip 10s內請求數量超過300 將他拉黑 30 分鐘,這裡使用redis實現計數器
自己寫的簡陋版本
//請求完成觸發的事件,不會阻塞請求 g.RequestIncrement += (sender, e) => { Task.Factory.StartNew(() => { var db = Program.redis.GetDatabase(); var counter = db.KeyExists(e.Request.RemoteIPAddress);//判斷該ip是否存在計數器 if (counter) { var count = db.StringIncrement(e.Request.RemoteIPAddress);//計數器加1 if (count > 300) { db.StringSet("BlackList_" + e.Request.RemoteIPAddress, "", new TimeSpan(0, 1, 0), flags: StackExchange.Redis.CommandFlags.FireAndForget);//拉黑半個小時,不等待返回值 } } else { db.StringIncrement(e.Request.RemoteIPAddress, flags: StackExchange.Redis.CommandFlags.FireAndForget);//建立計數器 db.KeyExpire(e.Request.RemoteIPAddress, new TimeSpan(0, 0, 10), flags: StackExchange.Redis.CommandFlags.FireAndForget);//設定10s過期 } }); };
class OverrideApiGetewap : Bumblebee.Gateway { //請求管道的第一個事件 protected override void OnHttpRequest(object sender, EventHttpRequestArgs e) { if (!e.Request.Path.Contains("/__system/bumblebee") && e.Request.Path != "/")//排除掉訪問閘道器ui的 { var db = Program.redis.GetDatabase(); var isBlack = db.KeyExists("BlackList_" + e.Request.RemoteIPAddress); if (isBlack) { e.Response.Result(new JsonResult("你被拉黑了")); e.Cancel = true;//取消請求 } else { base.OnHttpRequest(sender, e); } //base.OnHttpRequest(sender, e); } else { base.OnHttpRequest(sender, e); } } }
熔斷器:當某個請求轉發下游服務返回錯誤次數或者超時次數達到閥值時自動熔斷該節點,暫未實現
介面驗籤:客戶端請求都帶上用 url時間戳 引數加密的簽名,閘道器進行驗證,確保是合法的客戶端
閘道器自帶UI
l 鏈路追蹤 效能監控
Skywalking 官網:http://skywalking.apache.org/
每個請求的鏈路,每一個步驟的耗時都可以查到,如下圖的一個請求執行了很多次sql,每個步驟的sql語句都可以看到,整合很簡單,使用官方提供的.net探針整合到各個服務就好了,無程式碼入侵。
有一個很強大的ui介面,也可以提供報警等功能,ui可以檢視到響應很慢的介面,平均響應時間,以及每個服務的關聯關係,但是有個問題我沒有解決,RPC鏈路追蹤不到。
可以自行去官方查閱使用文件
l 分散式日誌收集框架
例項太多了,不可能使用單機日誌,需要一個分散式日誌收集框架把所有日誌收集到一起,可以考慮使用java的elk 或者 .net core 的Exceptionless
l 分散式事務
跨服務之間呼叫並且涉及到事務的處理方式,還在想怎麼弄
l 配置中心
各個例項逐個配置太麻煩了,特別是如果更改了資料庫地址,每一個服務的所有例項都要改,改死去,並且重啟例項也不現實,一定要支援配置熱更新,試了下攜程的Apollo有點消耗資源
l CI/CD
將原始碼管理做一個開發分支,一個測試分支,一個釋出分支,開發只動開發分支,開發完成後提交程式碼,由測試合併到測試分支,並通知Jenkins生成映象併發布到測試站點,測試通過之後由運維合併到釋出分支,或手動或自動通過Jenkins釋出,應該保證 測試分支與釋出分支的版本能對應docker映象倉庫的每一個版本,個人見解。
l 例:XXXX服務的專案原始碼結構
記錄與分享自己的一次微服務實踐
以上均為個人見解,不對的地方或者好的建議歡迎來信 [email protected]