Dotnet微服務:Steeltoe使用Hystrix熔斷器
一,準備內容
熔斷:微服務架構不可避免需要進行服務間的通訊,當某個服務呼叫另一個服務,當被呼叫服務出現狀況,如宕機、網路延遲等狀況不能做出及時響應時,切斷該服務,避免當前服務不斷進行重試而可能導致的系統“雪崩”或大量的超時等待影響服務效能。
降級:觸發熔斷後,在一個可配置時間內所有對被熔斷服務的請求都將被做請求降級處理,以提供一個有損的服務,保證整個系統的穩定性和可以性。
hystrix:Netflix API 團隊開發,通過分離服務的呼叫點,阻止錯誤在各個系統的傳播,並且提供了錯誤回撥機制,被設計用來做了下面幾件事:
- 保護系統間的呼叫延時以及錯誤,特別是通過第三方的工具的網路呼叫
- 阻止錯誤在分散式系統之前的傳播
- 快速失敗和迅速恢復
- 錯誤回退和優雅的服務降級
Steeltoe.CircuitBreaker.Hystrix:Dotnet平臺上Hystrix的實現包。官方文件
Hystrix熔斷觸發機制:
1,自動觸發:通過配置Hystrix以下引數實現自動觸發
//是否根據斷路器的執行狀態開啟斷路器短路 circuitBreaker.enabled //sleepWindowInMilliseconds時間視窗內斷路器短路的最小請求值 circuitBreaker.requestVolumeThreshold //觸發斷路器的週期時間值 circuitBreaker.sleepWindowInMilliseconds //斷路器的sleepWindowInMilliseconds視窗期內能夠容忍的錯誤百分比閾值 circuitBreaker.errorThresholdPercentage
理解:在circuitBreaker.sleepWindowInMilliseconds時間內最少有circuitBreaker.requestVolumeThreshold個請求並且請求異常數量達到總數的circuitBreaker.errorThresholdPercentage百分比時觸發熔斷
更多配置資訊請參考:https://github.com/Netflix/Hystrix/wiki/Configuration
2,強制觸發
//強制關閉 circuitBreaker.forceClosed //強制開啟 circuitBreaker.forceOpen
二,使用Steeltoe.CircuitBreaker.Hystrix
下面使用例項場景模擬說明Steeltoe.CircuitBreaker.Hystrix的使用和作用。
有二個服務,使用者管理服務及位置管理服務,用管理服務通過IHttpclientFactory向位置管理服務查詢某個使用者家庭地址的GPS座標,位置管理服務出現狀況不能及時迴應,通過Hystrix的Failback返回一個預設值。
1,安裝Nuget安裝包:Steeltoe.CircuitBreaker.Hystrix
2,使用者管理服務使用IHttpclientFactory訪問位置管理服務:參考之前的文章:使用IHttpclientFactory實現服務之間的通訊
位置管理服務:新增獲取位置介面:
[HttpPost("GetPosition")] public object GetPosition([FromBody] string address) { return new { Lat = 127.21455, Lng = 52.4878997, Address = address }; }
使用者管理服務:建立型別化客戶端:
public class OrderServiceRemoting { public HttpClient client { get; } public OrderServiceRemoting(HttpClient client) { client.BaseAddress = new Uri("http://eureka-order-service"); client.DefaultRequestHeaders.Add("OrderServiceHeader", "test"); this.client = client; } public async Task<string> GetOrderAsync(string name) { var request = new HttpRequestMessage(HttpMethod.Post, "api/order/getorder"); request.Content = new StringContent(JsonConvert.SerializeObject("test"), System.Text.Encoding.UTF8, "application/json"); using var ret =await client.SendAsync(request); ret.EnsureSuccessStatusCode(); return await ret.Content.ReadAsStringAsync(); } }
使用者管理服務:新增型別化客戶端 Startup.ConfigureServices
services.AddHttpClient("positionservice").AddServiceDiscovery().AddTypedClient<PositionServiceRemoting>();
使用者管理服務:使用:
[Route("api/[controller]")] [ApiController] public class UserController : ControllerBase { private readonly PositionServiceRemoting positionServiceRemoting; public UserController(PositionServiceRemoting positionServiceRemoting) { this.positionServiceRemoting = positionServiceRemoting; } [HttpGet("GetAddress")] public async Task<PostionModel> GetAddressAsync(string address) { return await positionServiceRemoting.GetPositionAsync(address); } }
訪問:
3,建立一個Hystrix命令,命令類必需繼承自HystrixCommand<ReslutType>。ReslutType為預計返回型別。
public interface IPositionServiceCommand { Task<PostionModel> GetAddressAsync(string address); } public class PositionServiceCommand:HystrixCommand<PostionModel>, IPositionServiceCommand { PositionServiceRemoting positionServiceRemoting; ILogger<PositionServiceCommand> log; private string address; public PositionServiceCommand( IHystrixCommandOptions options, PositionServiceRemoting positionServiceRemoting, ILogger<PositionServiceCommand> log) :base(options) { this.positionServiceRemoting = positionServiceRemoting; this.log = log; } public Task<PostionModel> GetAddressAsync(string address) { this.address = address; return ExecuteAsync(); } protected override Task<PostionModel> RunAsync() { return this.positionServiceRemoting.GetPositionAsync(address); } protected override Task<PostionModel> RunFallbackAsync() { PostionModel model = new PostionModel() { Address = this.address, Lat = 0, Lng = 0 }; log.LogInformation("進入fallback進行降級處理"); return Task.FromResult(model); } }
新增該命令
Startup.ConfigureServices
services.AddHystrixCommand<IPositionServiceCommand, PositionServiceCommand>("positionHystrix", Configuration);
使用命令訪問資源
private readonly IPositionServiceCommand positionServiceCommand; public UserController(IPositionServiceCommand positionServiceCommand) { this.positionServiceCommand = positionServiceCommand; } [HttpGet("GetAddress")] public async Task<PostionModel> GetAddressAsync(string address) { return await positionServiceCommand.GetAddressAsync(address); }
配置斷路器引數,將最小請求數改為4,系統預設值為20。意味著在預設時間內(10秒)如果有4次以上的請求並有50%的請求異常數則開啟熔斷。
Appsettings.json
"hystrix": { "command": { "positionHystrix": { "circuitBreaker": { "requestVolumeThreshold":4 } } } }
三,合併命令請求
應用場景:在一個時間視窗內請求同一服務的數量很多,可以將這些請求合併成一個請求,減少請求數量,優化資源佔用。
要求:被呼叫方需要提供批量返回結果的介面
1,位置管理服務新建批量返回結果
[HttpPost("GetPositions")] public object GetPositions([FromBody] List<string> address) { List<object> list = new List<object>(); if (address != null) { address.ForEach(r => { list.Add(new { Lat = 127.21455+1, Lng = 52.4878997+1, Address = r }); }); } return list; }
2,使用者管理服務:IHttpclientFactory型別化客戶端新增Getpositions的呼叫
public class PostionModel { public decimal Lat { get; set; } public decimal Lng { get; set; } public string Address { get; set; } } public class PositionServiceRemoting { HttpClient httpClient { get; } readonly ILogger logger; public PositionServiceRemoting(HttpClient httpClient,ILogger <PositionServiceRemoting> logger) { httpClient.BaseAddress = new Uri("http://eureka-order-service"); this.httpClient = httpClient; this.logger = logger; } public async Task<PostionModel> GetPositionAsync(string address) { var request = new HttpRequestMessage(HttpMethod.Post, "api/position/GetPosition") { Content = new StringContent(JsonConvert.SerializeObject(address), System.Text.Encoding.UTF8, "application/json") }; using var ret = await httpClient.SendAsync(request); ret.EnsureSuccessStatusCode(); string retString= await ret.Content.ReadAsStringAsync(); return JsonConvert.DeserializeObject<PostionModel>(retString); } public async Task<List<PostionModel>> GetPositionsAsync(List<string> address) { var request = new HttpRequestMessage(HttpMethod.Post, "api/position/GetPositions") { Content = new StringContent(JsonConvert.SerializeObject(address), System.Text.Encoding.UTF8, "application/json") }; using var ret = await httpClient.SendAsync(request); ret.EnsureSuccessStatusCode(); string retString = await ret.Content.ReadAsStringAsync(); return JsonConvert.DeserializeObject<List<PostionModel>>(retString); } }
3,使用者管理服務:為批量位置返回建立新的Hystrix命令
public class BatPositionServiceCommand : HystrixCommand<List<PostionModel>> { PositionServiceRemoting positionServiceRemoting; ILogger<BatPositionServiceCommand> log; ICollection<ICollapsedRequest<PostionModel, string>> requests; public BatPositionServiceCommand( IHystrixCommandGroupKey group, PositionServiceRemoting positionServiceRemoting, ICollection<ICollapsedRequest<PostionModel, string>> requests, ILogger<BatPositionServiceCommand> log) : base(group) { this.positionServiceRemoting = positionServiceRemoting; this.log = log; this.requests = requests; } protected override Task<List<PostionModel>> RunAsync() { return positionServiceRemoting.GetPositionsAsync(this.requests.ToList().Select(r=>r.Argument).ToList()); } protected override Task<List<PostionModel>> RunFallbackAsync() { List<PostionModel> model = new List<PostionModel>(); this.requests.ToList().Select(r => r.Argument).ToList().ForEach(r => { model.Add(new PostionModel() { Address = r, Lat = 0, Lng = 0 }); }); log.LogInformation("進入fallback進行降級處理"); return Task.FromResult(model); } }
4,使用者管理服務:新建Hystrix合併命令
public class PositionHystrixCollapser : HystrixCollapser<List<PostionModel>, PostionModel, string> { ILogger<PositionHystrixCollapser> logger; PositionServiceRemoting positionServiceRemoting; ILoggerFactory loggerFactory; public string Address { get; set; } public override string RequestArgument { get { return Address; } } public PositionHystrixCollapser( HystrixCollapserOptions options, PositionServiceRemoting positionServiceRemoting, ILogger<PositionHystrixCollapser> logger, ILoggerFactory loggerFactory ) : base(options) { this.logger = logger; this.loggerFactory = loggerFactory; this.positionServiceRemoting = positionServiceRemoting; } public async Task<PostionModel> GetPositionAsync(string address) { this.Address = address; return await this.ExecuteAsync(); } protected override HystrixCommand<List<PostionModel>> CreateCommand(ICollection<ICollapsedRequest<PostionModel, string>> requests) { var command = new BatPositionServiceCommand(HystrixCommandGroupKeyDefault.AsKey("BatPositionService"), positionServiceRemoting, requests,loggerFactory.CreateLogger<BatPositionServiceCommand>()); return command; } protected override void MapResponseToRequests(List<PostionModel> batchResponse, ICollection<ICollapsedRequest<PostionModel, string>> requests) { foreach (var req in requests) { var exsit = batchResponse.Where(r => r.Address == req.Argument).FirstOrDefault(); if (exsit != null) { req.Response = exsit; } } } }
5,新增命令及合併命令到容器
Startup.Configservices
services.AddHttpClient("positionservice").AddServiceDiscovery().AddTypedClient<PositionServiceRemoting>(); services.AddHystrixCollapser<PositionHystrixCollapser>("PostionCollapser", Configuration); services.AddHystrixCommand<IPositionServiceCommand, PositionServiceCommand>("positionHystrix", Configuration); services.AddHystrixCommand<BatPositionServiceCommand>("BatPositionService", Configuration);
Startup.Config
app.UseHystrixRequestContext();
6,使用:
readonly PositionHystrixCollapser positionHystrixCollapser; public UserController(IPositionServiceCommand positionServiceCommand,PositionHystrixCollapser positionHystrixCollapser) { this.positionServiceCommand = positionServiceCommand; this.positionHystrixCollapser = positionHystrixCollapser; } [HttpGet("GetAddress2")] public async Task<PostionModel> GetAddress2Async(string address) { return await positionHystrixCollapser.GetPositionAsync(address); }
7,配置引數
//批量請求佇列最大請求數量 maxRequestsInBatch //時間視窗,在該視窗內的請求將會合並 timerDelayInMilliseconds
四,啟用UI介面
1,安裝Nuget包:Steeltoe.CircuitBreaker.Hystrix.MetricsEventsCore
2,啟用Metrics
Startup.Configservices
services.AddHystrixMetricsStream(Configuration);
Startup.Config
app.UseMvc();
app.UseHystrixMetricsStream();
3,建立一個spring-boot專案,引用:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId> </dependency>
配置
server: port: 8017 spring: application: name: Hystrix-Dashboard hystrix: dashboard: proxy-stream-allow-list: "*"
啟動後開啟地址:http://localhost:8017/hystrix,在流源輸入行中輸入使用者管理服務地址:http://localhost:8013/hystrix/hystrix.stream
點選Monitor Stream