Hystrix命令執行流程
前言
Hystrix已經不在維護了,但是成功的開源專案總是值得學習的.剛開始看 Hystrix 原始碼時,會發現一堆 Action,Function 的邏輯,這其實就是 RxJava 的特點了--響應式程式設計.上篇文章已經對RxJava作過入門介紹,不熟悉的同學可以先去看看.本文會簡單介紹 Hystrix,再根據demo結合原始碼來瞭解Hystrix的執行流程.
Hystrix簡單介紹
-
什麼是 Hystrix?
Hystrix 是一個延遲和容錯庫,旨在隔離對遠端系統、服務和第三方庫的訪問點,停止級聯故障,並在錯誤不可避免的複雜分散式系統中能夠彈性恢復。
-
核心概念
-
Command 命令
Command 是Hystrix的入口,對使用者來說,我們只需要建立對應的 command,將需要保護的介面包裝起來就可以.可以無需關注再之後的邏輯.與 Spring 深度整合後還可以通過註解的方式,就更加對開發友好了.
-
Circuit Breaker 斷路器
斷路器,是從電氣領域引申過來的概念,具有過載、短路和欠電壓保護功能,有保護線路和電源的能力.在Hystrix中即為當請求超過一定比例響應失敗時,hystrix 會對請求進行攔截處理,保證服務的穩定性,以及防止出現服務之間級聯雪崩的可能性.
-
Isolation 隔離策略
隔離策略是 Hystrix 的設計亮點所在,利用艙壁模式的思想來對訪問的資源進行隔離,每個資源是獨立的依賴,單個資源的異常不應該影響到其他. Hystrix 的隔離策略目前有兩種:執行緒池隔離
-
-
Hystrix的執行流程
官方的 How it Works 對流程有很詳細的介紹,圖示清晰,相信看完流程圖就能對執行流程有一定的瞭解.
一次Command執行
HystrixCommand
是標準的命令模式實現,每一次請求即為一次命令的建立執行經歷的過程.從上述Hystrix流程圖可以看出建立流程最終會指向toObservable
,在之前RxJava入門時有介紹到Observable
即為被觀察者,作用是傳送資料給觀察者進行相應的,因此可以知道這個方法應該是較為關鍵的.
UML
- HystrixInvokable 標記這個一個可執行的介面,沒有任何抽象方法或常量
- HystrixExecutable 是為
HystrixCommand
設計的介面,主要提供執行命令的抽象方法,例如:execute()
,queue()
,observe()
- HystrixObservable 是為
Observable
設計的介面,主要提供自動訂閱(observe()
)和生成Observable(toObservable()
)的抽象方法 - HystrixInvokableInfo 提供大量的狀態查詢(獲取屬性配置,是否開啟斷路器等)
- AbstractCommand 核心邏輯的實現
- HystrixCommand 定製邏輯實現以及留給使用者實現的介面(比如:
run()
)
樣例程式碼
通過新建一個 command 來看 Hystrix 是如何建立並執行的.HystrixCommand 是一個抽象類,其中有一個run
方法需要我們實現自己的業務邏輯,以下是偷懶採用匿名內部類的形式呈現.構造方法的內部實現我們就不關注了,直接看下執行的邏輯吧.
HystrixCommand demo = new HystrixCommand<String>(HystrixCommandGroupKey.Factory.asKey("demo-group")) {
@Override
protected String run() {
return "Hello World~";
}
};
demo.execute();
複製程式碼
執行過程
流程圖
這是官方給出的一次完整呼叫的鏈路.上述的 demo 中我們直接呼叫了execute
方法,所以呼叫的路徑為execute() -> queue() -> toObservable() -> toBlocking() -> toFuture() -> get()
.核心的邏輯其實就在toObservable()
中.
HystrixCommand.java
execute
execute
方法為同步呼叫返回結果,並對異常作處理.內部會呼叫queue
// 同步呼叫執行
public R execute() {
try {
// queue()返回的是Future型別的物件,所以這裡是阻塞get
return queue().get();
} catch (Exception e) {
throw decomposeException(e);
}
}
複製程式碼
queue
queue
的第一行程式碼完成了核心的訂閱邏輯.
-
toObservable()
生成了 Hystrix 的 Observable 物件 - 將
Observable
轉換為BlockingObservable
可以阻塞控制資料傳送 -
toFuture
實現對BlockingObservable
的訂閱
public Future<R> queue() {
// 著重關注的是這行程式碼
// 完成了Observable的建立及訂閱
// toBlocking()是將Observable轉為BlockingObservable,轉換後的Observable可以阻塞資料的傳送
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
// 由於toObservable().toBlocking().toFuture()返回的Future如果中斷了,
// 不會對當前執行緒進行中斷,所以這裡將返回的Future進行了再次包裝,處理異常邏輯
...
}
// 判斷是否已經結束了,有異常則直接丟擲
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
// 省略這段判斷
}
}
return f;
}
複製程式碼
BlockingObservable.java
// 被包裝的Observable
private final Observable<? extends T> o;
// toBlocking()會呼叫該靜態方法將 源Observable簡單包裝成BlockingObservable
public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
return new BlockingObservable<T>(o);
}
public Future<T> toFuture() {
return BlockingOperatorToFuture.toFuture((Observable<T>)o);
}
複製程式碼
BlockingOperatorToFuture.java
The
toFuture
operator applies to theBlockingObservable
subclass,so in order to use it,you must first convert your source Observable into aBlockingObservable
by means of either theBlockingObservable.from
method or theObservable.toBlocking
operator.
toFuture
只能作用於BlockingObservable
所以也才會有上文想要轉換為BlockingObservable的操作
// 該操作將 源Observable轉換為返回單個資料項的Future
public static <T> Future<T> toFuture(Observable<? extends T> that) {
// CountDownLatch 判斷是否完成
final CountDownLatch finished = new CountDownLatch(1);
// 儲存執行結果
final AtomicReference<T> value = new AtomicReference<T>();
// 儲存錯誤結果
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
// single()方法可以限制Observable只傳送單條資料
// 如果有多條資料 會拋 IllegalArgumentException
// 如果沒有資料可以傳送 會拋 NoSuchElementException
@SuppressWarnings("unchecked")
final Subscription s = ((Observable<T>)that).single().subscribe(new Subscriber<T>() {
// single()返回的Observable就可以對其進行標準的處理了
@Override
public void onCompleted() {
finished.countDown();
}
@Override
public void onError(Throwable e) {
error.compareAndSet(null,e);
finished.countDown();
}
@Override
public void onNext(T v) {
// "single" guarantees there is only one "onNext"
value.set(v);
}
});
// 最後將Subscription返回的資料封裝成Future,實現對應的邏輯
return new Future<T>() {
// 可以檢視原始碼
};
}
複製程式碼
AbstractCommand.java
AbstractCommand
是toObservable
實現的地方,屬於Hystrix的核心邏輯,程式碼較長,可以和方法呼叫的流程圖一起食用.toObservable
主要是完成快取和建立Observable,requestLog的邏輯,當第一次建立Observable時,applyHystrixSemantics
方法是Hystrix的語義實現,可以跳著看.
tips: 下文中有很多 Action和 Function,他們很相似,都有call方法,但是區別在於Function有返回值,而Action沒有,方法後跟著的數字代表有幾個入參.Func0/Func3即沒有入參和有三個入參
toObservable
toObservable
程式碼較長且分層還是清晰的,所以下面一塊一塊寫.其邏輯和文章開始提到的Hystrix流程圖是完全一致的.
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
// 此處省略掉了很多個Action和Function,大部分是來做掃尾清理的函式,所以用到的時候再說
// defer在上篇rxjava入門中提到過,是一種建立型的操作符,每次訂閱時會產生新的Observable,回撥方法中所實現的才是真正我們需要的Observable
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 校驗命令的狀態,保證其只執行一次
if (!commandState.compareAndSet(CommandState.NOT_STARTED,CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION,_cmd.getClass(),getLogMessagePrefix() + " command executed multiple times - this is not permitted.",ex,null);
}
commandStartTimestamp = System.currentTimeMillis();
// properties為當前command的所有屬性
// 允許記錄請求log時會儲存當前執行的command
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
// 是否開啟了請求快取
final boolean requestCacheEnabled = isRequestCachingEnabled();
// 獲取快取key
final String cacheKey = getCacheKey();
// 開啟快取後,嘗試從快取中取
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache,_cmd);
}
}
// 沒有開啟請求快取時,就執行正常的邏輯
Observable<R> hystrixObservable =
// 這裡又通過defer建立了我們需要的Observable
Observable.defer(applyHystrixSemantics)
// 傳送前會先走一遍hook,預設executionHook是空實現的,所以這裡就跳過了
.map(wrapWithAllOnNextHooks);
// 得到最後的封裝好的Observable後,將其放入快取
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable,_cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey,toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache,_cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
// 終止時的操作
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line),or unsubscribe (next line))
// 取消訂閱時的操作
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
// 完成時的操作
.doOnCompleted(fireOnCompletedHook);
}
}
複製程式碼
handleRequestCacheHitAndEmitValues
快取擊中時的處理
private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache,final AbstractCommand<R> _cmd) {
try {
// Hystrix中有大量的hook 如果有心做二次開發的,可以利用這些hook做到很完善的監控
executionHook.onCacheHit(this);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit",hookEx);
}
// 將快取的結果賦給當前command
return fromCache.toObservableWithStateCopiedInto(this)
// doOnTerminate 或者是後面看到的doOnUnsubscribe,doOnError,都指的是在響應onTerminate/onUnsubscribe/onError後的操作,即在Observable的生命週期上註冊一個動作優雅的處理邏輯
.doOnTerminate(new Action0() {
@Override
public void call() {
// 命令最終狀態的不同進行不同處理
if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED,CommandState.TERMINAL)) {
cleanUpAfterResponseFromCache(false); //user code never ran
} else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED,CommandState.TERMINAL)) {
cleanUpAfterResponseFromCache(true); //user code did run
}
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
// 命令最終狀態的不同進行不同處理
if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED,CommandState.UNSUBSCRIBED)) {
cleanUpAfterResponseFromCache(false); //user code never ran
} else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED,CommandState.UNSUBSCRIBED)) {
cleanUpAfterResponseFromCache(true); //user code did run
}
}
});
}
複製程式碼
applyHystrixSemantics
因為本片文章的主要目的是在講執行流程,所以失敗回退和斷路器相關的就留到以後的文章中再寫.
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 不再訂閱了就返回不傳送資料的Observable
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
// 不傳送任何資料或通知
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// 標記開始執行的hook
// 如果hook內拋異常了,會快速失敗且沒有fallback處理
executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
// 斷路器核心邏輯: 判斷是否允許執行(TODO)
if (circuitBreaker.allowRequest()) {
// Hystrix自己造的訊號量輪子,之所以不用juc下,官方解釋為juc的Semphore實現太複雜,而且沒有動態調節的訊號量大小的能力,簡而言之,不滿足需求!
// 根據不同隔離策略(執行緒池隔離/訊號量隔離)獲取不同的TryableSemphore
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
// Semaphore釋放標誌
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
// 釋放訊號量的Action
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false,true)) {
executionSemaphore.release();
}
}
};
// 異常處理
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
// HystrixEventNotifier是hystrix的外掛,不同的事件傳送不同的通知,預設是空實現.
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN,commandKey);
}
};
// 執行緒池隔離的TryableSemphore始終為true
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
// executionResult是一次命令執行的結果資訊封裝
// 這裡設定起始時間是為了記錄命令的生命週期,執行過程中會set其他屬性進去
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
// 報錯時的處理
.doOnError(markExceptionThrown)
// 終止時釋放
.doOnTerminate(singleSemaphoreRelease)
// 取消訂閱時釋放
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
// tryAcquire失敗後會做fallback處理,TODO
return handleSemaphoreRejectionViaFallback();
}
} else {
// 斷路器短路(拒絕請求)fallback處理 TODO
return handleShortCircuitViaFallback();
}
}
複製程式碼
executeCommandAndObserve
/**
* 執行run方法的地方
*/
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
// 獲取當前上下文
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
// 傳送資料時的Action響應
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
// 如果onNext時需要上報時,做以下處理
if (shouldOutputOnNextEvents()) {
// result標記
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
// 通知
eventNotifier.markEvent(HystrixEventType.EMIT,commandKey);
}
// commandIsScalar是一個我不解的地方,在網上也沒有查到好的解釋
// 該方法為抽象方法,有HystrixCommand實現返回true.HystrixObservableCommand返回false
if (commandIsScalar()) {
// 耗時
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
// 通知
eventNotifier.markCommandExecution(getCommandKey(),properties.executionIsolationStrategy().get(),(int) latency,executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS,commandKey);
executionResult = executionResult.addEvent((int) latency,HystrixEventType.SUCCESS);
// 斷路器標記成功(斷路器半開時的反饋,決定是否關閉斷路器)
circuitBreaker.markSuccess();
}
}
};
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
// 同markEmits 類似處理
}
}
};
// 失敗回退的邏輯
final Func1<Throwable,Observable<R>> handleFallback = new Func1<Throwable,Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
// 不是重點略過了
}
};
// 請求上下文的處理
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};
Observable<R> execution;
// 如果有執行超時限制,會將包裝後的Observable再轉變為支援TimeOut的
if (properties.executionTimeoutEnabled().get()) {
// 根據不同的隔離策略包裝為不同的Observable
execution = executeCommandWithSpecifiedIsolation(_cmd)
// lift 是rxjava中一種基本操作符 可以將Observable轉換成另一種Observable
// 包裝為帶有超時限制的Observable
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
複製程式碼
executeCommandWithSpecifiedIsolation
根據不同的隔離策略建立不同的執行Observable
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 由於原始碼太長,這裡只關注正常的流程,需要詳細瞭解可以去看看原始碼
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD,ThreadState.STARTED)) {
try {
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed,so return immediately
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}})
.doOnTerminate(new Action0() {})
.doOnUnsubscribe(new Action0() {})
// 指定在某一個執行緒上執行,是rxjava中很重要的執行緒排程的概念
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
}));
} else { // 訊號量隔離策略
return Observable.defer(new Func0<Observable<R>>() {
// 邏輯與執行緒池大致相同
});
}
}
複製程式碼
getUserExecutionObservable
獲取使用者執行的邏輯
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
// getExecutionObservable是抽象方法,有HystrixCommand自行實現
userObservable = getExecutionObservable();
} catch (Throwable ex) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
userObservable = Observable.error(ex);
}
// 將Observable作其他中轉
return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
複製程式碼
lift操作符
lift可以轉換成一個新的Observable,它很像一個代理,將原來的Observable代理到自己這裡,訂閱時通知原來的Observable傳送資料,經自己這裡流轉加工處理再返回給訂閱者.Map/FlatMap
操作符底層其實就是用的lift
進行實現的.
getExecutionObservable
@Override
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
// just操作符就是直接執行的Observable
// run方法就是我們實現的業務邏輯: Hello World~
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// 執行訂閱時將執行執行緒記為當前執行緒,必要時我們可以interrupt
executionThread.set(Thread.currentThread());
}
});
}
複製程式碼
總結
希望自己能把埋下的坑一一填完: 容錯機制,metrics,斷路器等等...