1. 程式人生 > >Hystrix 原始碼解析 —— 請求執行(四)之失敗回退邏輯

Hystrix 原始碼解析 —— 請求執行(四)之失敗回退邏輯

本文主要基於 Hystrix 1.5.X 版本

1. 概述

本文主要分享 Hystrix 命令執行(四)之失敗回退邏輯

建議 :對 RxJava 已經有一定的瞭解的基礎上閱讀本文。

Hystrix 執行命令整體流程如下圖:

  • 圈 :Hystrix 命令執行失敗,執行回退邏輯。也就是大家經常在文章中看到的“服務降級”
  • 圈 :四種情況會觸發失敗回退邏輯( fallback )。
    • 第六種 :bad-request ,TODO 【2014】【HystrixBadRequestException】,和 hystrix-javanica 子專案相關。

另外,#handleXXXX()

方法,整體程式碼比較類似,最終都是呼叫 #getFallbackOrThrowException() 方法,獲得【回退邏輯 Observable】或者【異常 Observable】,在 「8. #getFallbackOrThrowException(…)」 詳細解析。

推薦 Spring Cloud 書籍

2. handleFallback

《Hystrix 原始碼解析 —— 命令執行(一)之正常執行邏輯》「4. #executeCommandAndObserve(…)」 中,#executeCommandAndObserve(...)第 82 行 onErrorResumeNext(handleFallback)

程式碼,通過呼叫 Observable#onErrorResumeNext(...) 方法,實現【執行命令 Observable】執行異常時,返回【回退邏輯 Observable】,執行失敗回退邏輯。

 1: final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { 2:     @Override 3:     public Observable<R> call(Throwable t) { 4
: // 標記嘗試成功 5: circuitBreaker.markNonSuccess(); 6: // 標記 executionResult 執行異常 7: Exception e = getExceptionFromThrowable(t); 8: executionResult = executionResult.setExecutionException(e); 9: // 返回 【回退邏輯 Observable】10: if (e instanceof RejectedExecutionException) { // 執行緒池提交任務拒絕異常11: return handleThreadPoolRejectionViaFallback(e);12: } else if (t instanceof HystrixTimeoutException) { // 執行命令超時異常13: return handleTimeoutViaFallback();14: } else if (t instanceof HystrixBadRequestException) { // TODO 【2014】【HystrixBadRequestException】15: return handleBadRequestByEmittingError(e);16: } else {17: /18: Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.19: */20: if (e instanceof HystrixBadRequestException) { // TODO 【2014】【HystrixBadRequestException】21: eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);22: return Observable.error(e);23: }24: 25: return handleFailureViaFallback(e);26: }27: }28: };
  • 第 7 至 8 行 :標記 executionResult 執行異常
  • 第 14 至 23 行 :,bad-request ,TODO 【2014】【HystrixBadRequestException】,和 hystrix-javanica 子專案相關。

3. #handleShortCircuitViaFallback()

#handleShortCircuitViaFallback() 方法,short-circuit ,處理鏈路處於熔斷的回退邏輯,在 此處 被呼叫,程式碼如下 :


 1: private Observable<R> handleShortCircuitViaFallback() { 2:     // TODO 【2011】【Hystrix 事件機制】 3:     // record that we are returning a short-circuited fallback 4:     eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey); 5:     // 標記 executionResult 執行異常 6:     // short-circuit and go directly to fallback (or throw an exception if no fallback implemented) 7:     Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN"); 8:     executionResult = executionResult.setExecutionException(shortCircuitException); 9:     try {10:         // 獲得 【回退邏輯 Observable】 或者 【異常 Observable】11:         return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,12:                 "short-circuited", shortCircuitException);13:     } catch (Exception e) {14:         return Observable.error(e);15:     }16: }
  • 第 4 行 :TODO 【2011】【Hystrix 事件機制】
  • 第 7 至 8 行 :標記 executionResult 執行異常
  • 第 11 至 12 行 :呼叫 #getFallbackOrThrowException() 方法,獲得【回退邏輯 Observable】或者【異常 Observable】,在 「8. #getFallbackOrThrowException(…)」 詳細解析。
  • 第 14 行 :返回【異常 Observable】。

4. #handleSemaphoreRejectionViaFallback()

#handleSemaphoreRejectionViaFallback() 方法,semaphore-rejection ,處理訊號量獲得失敗的回退邏輯,在 此處 被呼叫,程式碼如下 :


 1: private Observable<R> handleSemaphoreRejectionViaFallback() { 2:     // 標記 executionResult 執行異常 3:     Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution"); 4:     executionResult = executionResult.setExecutionException(semaphoreRejectionException); 5:     // TODO 【2011】【Hystrix 事件機制】 6:     eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey); 7:     logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it 8:     // retrieve a fallback or throw an exception if no fallback available 9:     // 獲得 【回退邏輯 Observable】 或者 【異常 Observable】10:     return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,11:             "could not acquire a semaphore for execution", semaphoreRejectionException);12: }
  • 第 3 至 4 行 :標記 executionResult 執行異常
  • 第 6 至 7 行 :TODO 【2011】【Hystrix 事件機制】
  • 第 10 至 11 行 :呼叫 #getFallbackOrThrowException() 方法,獲得【回退邏輯 Observable】或者【異常 Observable】,在 「8. #getFallbackOrThrowException(…)」 詳細解析。

5. #handleThreadPoolRejectionViaFallback()

#handleThreadPoolRejectionViaFallback() 方法,thread-pool-rejection ,處理執行緒池提交任務拒絕的回退邏輯,在 此處 被呼叫,程式碼如下:


 1: private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) { 2:     // TODO 【2011】【Hystrix 事件機制】 3:     eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey); 4:     // TODO 【2002】【metrics】 5:     threadPool.markThreadRejection(); 6:     // 獲得 【回退邏輯 Observable】 或者 【異常 Observable】 7:     // use a fallback instead (or throw exception if not implemented) 8:     return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, 9:             "could not be queued for execution", underlying);10: }
  • 第 3 行 :TODO 【2011】【Hystrix 事件機制】
  • 第 5 行 :TODO 【2002】【metrics】
  • 第 8 至 9 行 :呼叫 #getFallbackOrThrowException() 方法,獲得【回退邏輯 Observable】或者【異常 Observable】,在 「8. #getFallbackOrThrowException(…)」 詳細解析。

6. #handleTimeoutViaFallback()

#handleTimeoutViaFallback() 方法,execution-timeout ,處理命令執行超時的回退邏輯,在 此處 被呼叫,程式碼如下:


1: private Observable<R> handleTimeoutViaFallback() {2:     // 獲得 【回退邏輯 Observable】 或者 【異常 Observable】3:     return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT,4:             "timed-out", new TimeoutException());5: }

7. #handleFailureViaFallback()

#handleFailureViaFallback() 方法,execution-failure ,處理命令執行異常的回退邏輯,在 此處 被呼叫,程式碼如下:


 1: private Observable<R> handleFailureViaFallback(Exception underlying) { 2:     // TODO 【2011】【Hystrix 事件機制】 3:     /* 4:       All other error handling 5:      */ 6:     logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying); 7:  8:     // report failure 9:     eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);10: 11:     // 標記 executionResult 異常 TODO 【2007】【executionResult】用途 為啥不是執行異常12:     // record the exception13:     executionResult = executionResult.setException(underlying);14:     // 獲得 【回退邏輯 Observable】 或者 【異常 Observable】15:     return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);16: }
  • 第 2 至 9 行 :TODO 【2011】【Hystrix 事件機制】
  • 第 13 行 :標記 executionResult 異常
  • 第 15 行 :呼叫 #getFallbackOrThrowException() 方法,獲得【回退邏輯 Observable】或者【異常 Observable】,在 「8. #getFallbackOrThrowException(…)」 詳細解析。

8. #getFallbackOrThrowException(…)

#getFallbackOrThrowException() 方法,獲得【回退邏輯 Observable】或者【異常 Observable】,程式碼如下 :


  1: private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {  2:     // 記錄 HystrixRequestContext  3:     final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();  4:     // 標記 executionResult 新增( 記錄 )事件  5:     long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();  6:     // record the executionResult  7:     // do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144)  8:     executionResult = executionResult.addEvent((int) latency, eventType);  9:  10:     if (isUnrecoverable(originalException)) { // 無法恢復的異常 11:         logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException); 12:  13:         // TODO 【2003】【HOOK】 14:         /* executionHook for all errors / 15:         Exception e = wrapWithOnErrorHook(failureType, originalException); 16:         // 返回 【異常 Observable】 17:         return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null)); 18:     } else { 19:         if (isRecoverableError(originalException)) { // 可恢復的異常 20:             logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException); 21:         } 22:  23:         if (properties.fallbackEnabled().get()) { 24:             / fallback behavior is permitted so attempt / 25:  26:             // 設定 HystrixRequestContext 的 Action 27:             final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { 28:                 @Override 29:                 public void call(Notification<? super R> rNotification) { 30:                     setRequestContextIfNeeded(requestContext); 31:                 } 32:             }; 33:  34:             // TODO 【2007】【executionResult】用途 35:             final Action1<R> markFallbackEmit = new Action1<R>() { 36:                 @Override 37:                 public void call(R r) { 38:                     if (shouldOutputOnNextEvents()) { 39:                         executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT); 40:                         eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey); 41:                     } 42:                 } 43:             }; 44:  45:             // TODO 【2007】【executionResult】用途 46:             final Action0 markFallbackCompleted = new Action0() { 47:                 @Override 48:                 public void call() { 49:                     long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); 50:                     eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey); 51:                     executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS); 52:                 } 53:             }; 54:  55:             // 處理異常 的 Func 56:             final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() { 57:                 @Override 58:                 public Observable<R> call(Throwable t) { 59:                     // TODO 【2003】【HOOK】 60:                     / executionHook for all errors */ 61:                     Exception e = wrapWithOnErrorHook(failureType, originalException); 62:                     // 獲得 Exception 63:                     Exception fe = getExceptionFromThrowable(t); 64:  65:                     long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); 66:                     Exception toEmit; 67:  68:                     if (fe instanceof UnsupportedOperationException) { 69:                         // TODO 【2011】【Hystrix 事件機制】 70:                         logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it 71:                         eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey); 72:                         // 標記 executionResult 新增( 記錄 )事件 HystrixEventType.FALLBACK_MISSING 73:                         executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING); 74:  75:                         // 建立 HystrixRuntimeException 76:                         toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe); 77:                     } else { 78:                         // TODO 【2011】【Hystrix 事件機制】 79:                         logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe); 80:                         eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey); 81:                         // 標記 executionResult 新增( 記錄 )事件 HystrixEventType.FALLBACK_FAILURE 82:                         executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE); 83:  84:                         // 建立 HystrixRuntimeException 85:                         toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe); 86:                     } 87:  88:                     // NOTE: we're suppressing fallback exception here 89:                     if (shouldNotBeWrapped(originalException)) { 90:                         return Observable.error(e); 91:                     } 92:  93:                     return Observable.error(toEmit); 94:                 } 95:             }; 96:  97:             // 獲得 TryableSemaphore 98:             final TryableSemaphore fallbackSemaphore = getFallbackSemaphore(); 99: 100:             // 訊號量釋放Action101:             final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);102:             final Action0 singleSemaphoreRelease = new Action0() {103:                 @Override104:                 public void call() {105:                     if (semaphoreHasBeenReleased.compareAndSet(false, true)) {106:                         fallbackSemaphore.release();107:                     }108:                 }109:             };110: 111:             Observable<R> fallbackExecutionChain;112: 113:             // acquire a permit114:             if (fallbackSemaphore.tryAcquire()) {115:                 try {116:                     if (isFallbackUserDefined()) {117:                         executionHook.onFallbackStart(this);118:                         fallbackExecutionChain = getFallbackObservable();119:                     } else {120:                         //same logic as above without the hook invocation121:                         fallbackExecutionChain = getFallbackObservable();122:                     }123:                 } catch (Throwable ex) {124:                     //If hook or user-fallback throws, then use that as the result of the fallback lookup125:                     fallbackExecutionChain = Observable.error(ex);126:                 }127: 128:                 // 獲得 【回退邏輯 Observable】129:                 return fallbackExecutionChain130:                         .doOnEach(setRequestContext)131:                         .lift(new FallbackHookApplication(_cmd)) // TODO 【2003】【HOOK】132:                         .lift(new DeprecatedOnFallbackHookApplication(_cmd))133:                         .doOnNext(markFallbackEmit)134:                         .doOnCompleted(markFallbackCompleted)135:                         .onErrorResumeNext(handleFallbackError) //136:                         .doOnTerminate(singleSemaphoreRelease)137:                         .doOnUnsubscribe(singleSemaphoreRelease);138:             } else {139:                return handleFallbackRejectionByEmittingError();140:             }141:         } else {142:             return handleFallbackDisabledByEmittingError(originalException, failureType, message);143:         }144:     }145: }
  • 耐心,這個方法看起來灰常長,也僅限於長,理解成難度很小。
  • 第 3 行 :記錄 HystrixRequestContext 。
  • 第 5 至 8 行 :標記 executionResult 新增( 記錄 )事件。
  • 第 10 至 17 行 :呼叫 #isUnrecoverable(Exception) 方法,若異常不可恢復,直接返回【異常 Observable】。點選 連結 檢視該方法。
  • 第 19 至 21 行 :呼叫 #isRecoverableError(Exception) 方法,若異常可恢復,列印 WARN 日誌。點選 連結 檢視該方法。主要針對 java.lang.Error 情況,列印 #isUnrecoverable(Exception) 排除掉的 Error。
  • 反向】第 141 至 143 行 :當配置 HystrixCommandProperties.fallbackEnabled = false ( 預設值 :true ) ,即失敗回退功能關閉,呼叫 #handleFallbackDisabledByEmittingError() ,返回【異常 Observable】。點選 連結 檢視該方法。
  • 反向】第 138 至 140 行 :失敗回退訊號量( TryableSemaphore )【注意,不是正常執行訊號量】使用失敗,呼叫 #handleFallbackRejectionByEmittingError() ,返回【異常 Observable】。點選 連結 檢視該方法。
  • 第 23 行 :當配置 HystrixCommandProperties.fallbackEnabled = true ( 預設值 :true ) ,即失敗回退功能開啟
  • 第 27 至 32 行 :設定 HystrixRequestContext 的 Action ,使用第 3 行記錄的 HystrixRequestContext 。
  • 第 35 至 43 行 :TODO 【2007】【executionResult】用途
  • 第 46 至 53 行 :TODO 【2007】【executionResult】用途
  • 第 56 至 95 行 :處理回退邏輯執行發生異常的 Func1 ,返回【異常 Observable】。

    • 第 61 行 :TODO 【2003】【HOOK】
    • 第 63 行 :呼叫 #getExceptionFromThrowable(Throwable) 方法,獲得 Exception 。若 t型別為 Throwable 時,包裝成 Exception 。點選 連結 檢視該方法程式碼。
    • 第 68 至 76 行 :當 fe型別為 UnsupportedOperationException 時,使用 e + fe 建立 HystrixRuntimeException 。該異常發生於 HystrixCommand#getFallback() 抽象方法未被覆寫。
    • 第 77 至 86 行 :當 fe型別為其他異常時,使用 e + fe 建立 HystrixRuntimeException 。該異常發生於 HystrixCommand#getFallback() 執行發生異常。
    • 第 89 至 91 行 :呼叫 #shouldNotBeWrapped() 方法,判斷 originalException 是 ExceptionNotWrappedByHystrix 的實現時,即要求返回的【異常 Observable】不使用 HystrixRuntimeException 包裝。點選 連結 檢視該方法程式碼。
    • 第 93 行 :返回【異常 Observable】,使用 toEmit ( HystrixRuntimeException ) 為異常。
  • 第 98 行 :呼叫 #getFallbackSemaphore() 方法,獲得失敗回退訊號量( TryableSemaphore )物件,點選 連結 檢視該方法程式碼。TryableSemaphore 在 《Hystrix 原始碼解析 —— 命令執行(一)之正常執行邏輯》「3. TryableSemaphore」 有詳細解析。


  • 第 100 至 109 行 :訊號量釋放的 Action。
  • 第 114 至 137 行 :失敗回退訊號量( TryableSemaphore )使用成功,返回【回退邏輯 Observable】。

    • 【重要】第 116 至 122 行 :呼叫 #getFallbackObservable() 方法,建立【回退邏輯 Observable】。將子類對 HystrixCommand#getFallback() 抽象方法的執行結果,使用 Observable#just(...) 包裝返回。點選 連結 檢視該方法的程式碼。

      • 第 116 行 :呼叫 #isFallbackUserDefined() 方法,返回命令子類是否實現 HystrixCommand#getFallback() 抽象方法。只有已實現( true ) 的情況下,呼叫 HOOK TODO 【2003】【HOOK】。

    • 第 129 至 137 行 :獲得 【回退邏輯 Observable】。

      • 第 131 行 :// TODO 【2003】【HOOK】
      • 第 135 行 :呼叫 Observable#onErrorResumeNext(...) 方法,實現【失敗回退 Observable】執行異常時,返回【異常 Observable】。


有兩個注意點:

  • 當命令執行超時時,失敗回退邏輯使用的是 HystrixTimer 的執行緒池
  • 失敗回退邏輯,無超時時間,使用要小心。