十、Spring cloud服務短路(Hystrix)之原始碼解析
阿新 • • 發佈:2018-12-21
- Spring Cloud Hystrix 原始碼解讀
- Nertflix Hystrix 原始碼解讀
- RxJava 基礎
一、Spring Cloud Hystrix 原始碼解讀
1、@EnableCircuitBreaker
(1)職責:
啟用 Circuit Breaker
(2)呼叫鏈路:
@EnableCircuitBreaker
<!-- 通過 EnableCircuitBreaker 註解上的註解 @Import(EnableCircuitBreakerImportSelector.class) 可知 -->
-> EnableCircuitBreakerImportSelector
<!--
EnableCircuitBreakerImportSelector 繼承 SpringFactoryImportSelector<EnableCircuitBreaker> ,通過探尋
發現,SpringFactoryImportSelector 類下的 selectImports() 方法中的
List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
.loadFactoryNames(this.annotationClass, this.beanClassLoader)));
可知,這裡是以 EnableCircuitBreaker 全限定名為key,找到對應的預設實現。通過
搜尋 org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker 找到 spring cloud 框架下的
spring.factories 檔案中,找到了其預設實現:
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
-->
-> HystrixCircuitBreakerConfiguration
2、HystrixCircuitBreakerConfiguration
(1)初始化元件
- HystrixCommandAspect
- HystrixShutdownHook
- HasFeatures
二、Nertflix Hystrix 原始碼解讀
1、HystrixCommandAspect
(1)依賴元件
- MetaHolderFactory:生成攔截方法元資訊
- HystrixCommandFactory:生成 HystrixInvokable
- HystrixInvokable
-
- CommandCollapser
-
- GenericObservableCommand
-
- GenericCommand
2、Future 來實現超時熔斷
/** * 通過 {@link Future} 實現 服務熔斷 * @author 鹹魚 * @date 2018/11/14 20:12 */ public class FutureCircuitBreakerDeo { public static void main(String[] args) throws ExecutionException, InterruptedException { //初始化執行緒池 ExecutorService executorService = Executors.newSingleThreadExecutor(); RandomCommand randomCommand = new RandomCommand(); Future<String> future = executorService.submit(() -> { //獲取 run 方法計算結果 return randomCommand.run(); }); String result = null; // 100 ms 超時時間 try { result = future.get(100, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { // fallback 方法呼叫 result = randomCommand.fallback(); } System.out.println(result); executorService.shutdown(); } /** * 隨即物件 */ private static final Random RANDOM = new Random(); /** * 隨機事件執行命令 */ static class RandomCommand implements Command<String> { @Override public String run() throws InterruptedException { long executeTime = RANDOM.nextInt(200); System.out.println("execute time : " + executeTime + "ms"); Thread.sleep(executeTime); return "hello"; } @Override public String fallback() { return "fallback"; } } public static interface Command<T> { /** * 正常執行,並且返回結果 * @return T */ T run() throws InterruptedException; /** * 錯誤時,返回容錯結果 * @return T */ T fallback(); } }
三、RxJava 基礎
1、單資料:Single API
//僅能釋出單個數據
Single.just("Hello,World!")
//在I/O執行緒執行
.subscribeOn(Schedulers.io())
//訂閱並且消費資料
.subscribe(RxJavaDemo::println);
Thread.sleep(100);
2、多資料:Observable API
List<Integer> values = Arrays.asList(1,2,3,4,5,6,7,8);
//釋出多個數據
Observable.from(values)
.subscribeOn(Schedulers.computation())
//訂閱並且消費資料
.subscribe(RxJavaDemo::println);
Thread.sleep(100);
3、使用標準 Reactive 模式:
public static void demoStandardReactive() throws InterruptedException {
List<Integer> values = Arrays.asList(1,2,3);
//釋出多個數據
Observable.from(values)
.subscribeOn(Schedulers.newThread())
//subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted)
//介面 Action1:需實現call(T t)
//介面 Action0:需實現call()
.subscribe(
//引數一:消費資料
value -> {
if (value > 2) {
throw new IllegalStateException("資料不容許大於2");
}
println("消費資料:" + value);
},
//引數二:當發生異常時,中斷執行
e -> println("發生異常:" + e.getMessage()),
//引數三:當邏輯執行完畢時
() -> println("邏輯執行完畢"))
;
//上面是非同步執行,需要休眠等待其執行完畢
Thread.sleep(100);
}