【SpringCloud實踐】之斷路器:Hystrix
一、服務雪崩效應
基礎服務的故障導致級聯故障,進而造成了整個分散式系統的不可用,這種現象被稱為服務雪崩效應。服務雪崩效應描述的是一種因服務提供者的不可用導致服務消費者的不可用,並將不可用逐漸放大的過程。
服務雪崩效應形成的原因
1、服務提供者不可用
- 硬體故障
- 程式Bug
- 快取擊穿
- 使用者大量請求
2、重試加大流量
- 使用者重試
- 程式碼邏輯重試
3、服務呼叫者不可用
- 同步等待造成的資源耗盡
服務雪崩的應對策略
1、流量控制
閘道器限流
使用者互動限流
關閉重試
2、改進快取模式
快取預載入
同步改為非同步重新整理
3、服務自動擴容
AWS的auto scaling
4、服務呼叫者降級服務
資源隔離
對依賴服務進行分類
不可用服務的呼叫快速失敗
二、hystrix的工作原理
Hystrix是Netflix開源的一個限流熔斷的專案、主要有以下功能:
- 隔離(執行緒池隔離和訊號量隔離):限制呼叫分散式服務的資源使用,某一個呼叫的服務出現問題不會影響其他服務呼叫。
- 優雅的降級機制:超時降級、資源不足時(執行緒或訊號量)降級,降級後可以配合降級介面返回託底資料。
-融斷:當失敗率達到閥值自動觸發降級(如因網路故障/超時造成的失敗率高),熔斷器觸發的快速失敗會進行快速恢復。
-快取:提供了請求快取、請求合併實現。支援實時監控、報警、控制(修改配置)
下面是他的工作流程:
Hystrix主要有4種呼叫方式:
- toObservable() 方法 :未做訂閱,只是返回一個Observable 。
- observe() 方法 :呼叫#toObservable() 方法,並向 Observable 註冊 rx.subjects.ReplaySubject 發起訂閱。
- queue() 方法 :呼叫 #toObservable() 方法的基礎上,呼叫:Observable#toBlocking() 和BlockingObservable#toFuture() 返回 Future 物件
- execute() 方法 :呼叫 #queue()方法的基礎上,呼叫 Future#get() 方法,同步返#run() 的執行結果。
主要的執行邏輯:
1.每次呼叫建立一個新的HystrixCommand,把依賴呼叫封裝在run()方法中.
2.執行execute()/queue做同步或非同步呼叫.
3.判斷熔斷器(circuit-breaker)是否開啟,如果開啟跳到步驟8,進行降級策略,如果關閉進入步驟.
4.判斷執行緒池/佇列/訊號量是否跑滿,如果跑滿進入降級步驟8,否則繼續後續步驟.
5.呼叫HystrixCommand的run方法.執行依賴邏輯,依賴邏輯呼叫超時,進入步驟8.
6.判斷邏輯是否呼叫成功。返回成功呼叫結果;調用出錯,進入步驟8.
7.計算熔斷器狀態,所有的執行狀態(成功, 失敗, 拒絕,超時)上報給熔斷器,用於統計從而判斷熔斷器狀態.
8.getFallback()降級邏輯。以下四種情況將觸發getFallback呼叫:
- run()方法丟擲非HystrixBadRequestException異常。
- run()方法呼叫超時
- 熔斷器開啟攔截呼叫
- 執行緒池/佇列/訊號量是否跑滿
- 沒有實現getFallback的Command將直接丟擲異常,fallback降級邏輯呼叫成功直接返回,降級邏輯呼叫失敗丟擲異常.
9.返回執行成功結果
三、HystrixCommand的使用
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-javanica</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-hystrix-dashboard</artifactId>
</dependency>
建立一個CommandHelloworldWithFallBack類,繼承與HystrixCommand
import java.util.Arrays;
import java.util.List;
import com.netflix.hystrix.HystrixCommand;
public class CommandHelloworldWithFallBack extends HystrixCommand<String>{
public static List<Integer> times = Arrays.asList(100, 200, 300, 400, 500, 600,
700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800, 1900, 2000, 2100);
private String name;
private int timeIndex;
public CommandHelloworldWithFallBack(Setter setter, String name, int timeIndex) {
super(setter);
this.name = name;
this.timeIndex = timeIndex;
}
@Override
protected String getFallback() {
return "fall back timeMillSeconds is :" + times.get(timeIndex);
}
@Override
protected String run() {
try {
Thread.currentThread().sleep(times.get(this.getTimeIndex()));
} catch (InterruptedException e) {
}
return "ok timeMillSeconds is :" + times.get(timeIndex);
}
public int getTimeIndex() {
return timeIndex;
}
public void setTimeIndex(int timeIndex) {
this.timeIndex = timeIndex;
}
}
建立一個單元測試類,執行
import org.junit.Test;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
public class CommandHelloworldWithFallBackTest {
@Test
public void testHystrix() {
int count = 0;
while (true) {
String s = new CommandHelloworldWithFallBack(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("threadpoolwithfallback"))
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
.withCoreSize(10)
)
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(1000)
.withCircuitBreakerSleepWindowInMilliseconds(5000)
.withCircuitBreakerErrorThresholdPercentage(50)
.withCircuitBreakerRequestVolumeThreshold(1))
, "ccc", count % 20).execute();
System.out.println(s);
count++;
try {
Thread.currentThread().sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
}
}
}
}
- HystrixCommand的子類需要實現兩個函式,run函式和getFallback函式,正常執行時,呼叫run函式,熔斷時呼叫getFallback
- 呼叫CommandHelloworldWithFallBack方法時,傳入Setter引數,對熔斷進行配置,設定超時時長為1秒,5秒熔斷時間窗檢查是否恢復。
執行可以看到,當run執行時長超過1秒時,系統就會熔斷轉而getFallback函式,並且每5秒檢查一次是否恢復。
ok timeMillSeconds is :800
ok timeMillSeconds is :900
ok timeMillSeconds is :1000
fall back timeMillSeconds is :1100
fall back timeMillSeconds is :1200
fall back timeMillSeconds is :1300
熔斷行為設定:
引數 | 描述 | 預設值 |
---|---|---|
circuitBreaker.enabled | 確定斷路器是否用於跟蹤執行狀況和短路請求(如果跳閘)。 | 預設值為true |
circuitBreaker.requestVolumeThreshold | 熔斷觸發的最小個數/10s | 預設值:20 |
circuitBreaker.sleepWindowInMilliseconds | 熔斷多少秒後去嘗試請求 | 預設值:5000 |
circuitBreaker.errorThresholdPercentage | 失敗率達到多少百分比後熔斷 | 預設值:50,主要根據依賴重要性進行調整 |
circuitBreaker.forceOpen | 屬性如果為真,強制斷路器進入開啟(跳閘)狀態,其中它將拒絕所有請求。 | 預設值為false,此屬性優先於circuitBreaker.forceClosed |
circuitBreaker.forceClosed | 該屬性如果為真,則迫使斷路器進入閉合狀態,其中它將允許請求,而不考慮誤差百分比。 | 預設值為false,如果是強依賴,應該設定為true,circuitBreaker.forceOpen屬性優先,因此如果forceOpen設定為true,此屬性不執行任何操作 |
註解方式
import java.util.concurrent.Future;
import org.springframework.stereotype.Service;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.command.AsyncResult;
@Service
public class HystrixCommandUserServices {
@HystrixCommand(fallbackMethod="getUserIdFallback")
public String getUserId(String name){
int i = 1/0;
return "你好" + name ;
}
@HystrixCommand(fallbackMethod="getUserIdFallbackTo")
public String getUserIdFallback(String name){
int i = 1/0;
return "getUserId Fail" ;
}
public String getUserIdFallbackTo(String name){
return "getUserId Fail2" ;
}
//非同步執行
@HystrixCommand(fallbackMethod="getUserIdFallback")
public Future<String> getUserName(final String name){
return new AsyncResult<String>(){
@Override
public String invoke() {
//int i = 1/0;
System.out.println("getUserName invoke!!!");
return "你好" + name ;
}
};
}
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.itmuch.cloud.ConsumerMovieRibbonApplication;
import com.itmuch.cloud.hystrix.HystrixCommandUserServices;
@RunWith(SpringRunner.class)
@SpringBootTest(classes=ConsumerMovieRibbonApplication.class)
public class HystrixCommandUserServicesTest {
@Autowired
private HystrixCommandUserServices hystrixCommandUserServices;
@Test
public void testUserServicegetUserId(){
String name = hystrixCommandUserServices.getUserId("Hello");
System.out.println("GetUserID=" + name);
}
@Test
public void testUserServicegetUserName() throws InterruptedException, ExecutionException{
System.out.println("testUserServicegetUserName Start!!!");
Future<String> future = hystrixCommandUserServices.getUserName("HystrixSync");
System.out.println("testUserServicegetUserName getUserName!");
System.out.println("ServicegetUserName" + future.get());
}
}