1. 程式人生 > 程式設計 >Hystrix命令執行流程

Hystrix命令執行流程

前言

Hystrix已經不在維護了,但是成功的開源專案總是值得學習的.剛開始看 Hystrix 原始碼時,會發現一堆 Action,Function 的邏輯,這其實就是 RxJava 的特點了--響應式程式設計.上篇文章已經對RxJava作過入門介紹,不熟悉的同學可以先去看看.本文會簡單介紹 Hystrix,再根據demo結合原始碼來瞭解Hystrix的執行流程.

Hystrix簡單介紹

  1. 什麼是 Hystrix?

    Hystrix 是一個延遲容錯庫,旨在隔離對遠端系統、服務和第三方庫的訪問點,停止級聯故障,並在錯誤不可避免的複雜分散式系統中能夠彈性恢復。

  2. 核心概念

    • Command 命令

      Command 是Hystrix的入口,對使用者來說,我們只需要建立對應的 command,將需要保護的介面包裝起來就可以.可以無需關注再之後的邏輯.與 Spring 深度整合後還可以通過註解的方式,就更加對開發友好了.

    • Circuit Breaker 斷路器

      斷路器,是從電氣領域引申過來的概念,具有過載短路欠電壓保護功能,有保護線路和電源的能力.在Hystrix中即為當請求超過一定比例響應失敗時,hystrix 會對請求進行攔截處理,保證服務的穩定性,以及防止出現服務之間級聯雪崩的可能性.

    • Isolation 隔離策略

      隔離策略是 Hystrix 的設計亮點所在,利用艙壁模式的思想來對訪問的資源進行隔離,每個資源是獨立的依賴,單個資源的異常不應該影響到其他. Hystrix 的隔離策略目前有兩種:執行緒池隔離

      ,訊號量隔離.

      isolation

  3. Hystrix的執行流程

    官方的 How it Works 對流程有很詳細的介紹,圖示清晰,相信看完流程圖就能對執行流程有一定的瞭解.

    來自hystrix的github站點

一次Command執行

HystrixCommand是標準的命令模式實現,每一次請求即為一次命令的建立執行經歷的過程.從上述Hystrix流程圖可以看出建立流程最終會指向toObservable,在之前RxJava入門時有介紹到Observable即為被觀察者,作用是傳送資料給觀察者進行相應的,因此可以知道這個方法應該是較為關鍵的.

UML

hystrixcommman-uml.png

  1. HystrixInvokable 標記這個一個可執行的介面,沒有任何抽象方法或常量
  2. HystrixExecutable 是為HystrixCommand設計的介面,主要提供執行命令的抽象方法,例如:execute(),queue(),observe()
  3. HystrixObservable 是為Observable設計的介面,主要提供自動訂閱(observe())和生成Observable(toObservable())的抽象方法
  4. HystrixInvokableInfo 提供大量的狀態查詢(獲取屬性配置,是否開啟斷路器等)
  5. AbstractCommand 核心邏輯的實現
  6. HystrixCommand 定製邏輯實現以及留給使用者實現的介面(比如:run())

樣例程式碼

通過新建一個 command 來看 Hystrix 是如何建立並執行的.HystrixCommand 是一個抽象類,其中有一個run方法需要我們實現自己的業務邏輯,以下是偷懶採用匿名內部類的形式呈現.構造方法的內部實現我們就不關注了,直接看下執行的邏輯吧.

HystrixCommand demo = new HystrixCommand<String>(HystrixCommandGroupKey.Factory.asKey("demo-group")) {
            @Override
            protected String run() {
                return "Hello World~";
            }
        };
demo.execute();
複製程式碼

執行過程

流程圖

execute

這是官方給出的一次完整呼叫的鏈路.上述的 demo 中我們直接呼叫了execute方法,所以呼叫的路徑為execute() -> queue() -> toObservable() -> toBlocking() -> toFuture() -> get().核心的邏輯其實就在toObservable()中.

HystrixCommand.java

execute

execute方法為同步呼叫返回結果,並對異常作處理.內部會呼叫queue

// 同步呼叫執行
public R execute() {
  try {
    // queue()返回的是Future型別的物件,所以這裡是阻塞get
    return queue().get();
  } catch (Exception e) {
    throw decomposeException(e);
  }
}
複製程式碼
queue

queue的第一行程式碼完成了核心的訂閱邏輯.

  1. toObservable() 生成了 Hystrix 的 Observable 物件
  2. Observable 轉換為 BlockingObservable 可以阻塞控制資料傳送
  3. toFuture 實現對 BlockingObservable 的訂閱
public Future<R> queue() {
  // 著重關注的是這行程式碼
  // 完成了Observable的建立及訂閱
  // toBlocking()是將Observable轉為BlockingObservable,轉換後的Observable可以阻塞資料的傳送
  final Future<R> delegate = toObservable().toBlocking().toFuture();

  final Future<R> f = new Future<R>() {
    // 由於toObservable().toBlocking().toFuture()返回的Future如果中斷了,
    // 不會對當前執行緒進行中斷,所以這裡將返回的Future進行了再次包裝,處理異常邏輯
    ...
  }

  // 判斷是否已經結束了,有異常則直接丟擲
  if (f.isDone()) {
    try {
      f.get();
      return f;
    } catch (Exception e) {
			// 省略這段判斷
    }
  }

  return f;
}
複製程式碼

BlockingObservable.java

// 被包裝的Observable
private final Observable<? extends T> o;

// toBlocking()會呼叫該靜態方法將 源Observable簡單包裝成BlockingObservable
public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
  return new BlockingObservable<T>(o);
}

public Future<T> toFuture() {
  return BlockingOperatorToFuture.toFuture((Observable<T>)o);
}
複製程式碼

BlockingOperatorToFuture.java

ReactiveX 關於toFuture的解讀

The toFuture operator applies to the BlockingObservable subclass,so in order to use it,you must first convert your source Observable into a BlockingObservable by means of either the BlockingObservable.from method or the Observable.toBlocking operator.

toFuture只能作用於BlockingObservable所以也才會有上文想要轉換為BlockingObservable的操作

// 該操作將 源Observable轉換為返回單個資料項的Future
public static <T> Future<T> toFuture(Observable<? extends T> that) {
  	// CountDownLatch 判斷是否完成
    final CountDownLatch finished = new CountDownLatch(1);
  	// 儲存執行結果
    final AtomicReference<T> value = new AtomicReference<T>();
  	// 儲存錯誤結果
    final AtomicReference<Throwable> error = new AtomicReference<Throwable>();

  	// single()方法可以限制Observable只傳送單條資料
  	// 如果有多條資料 會拋 IllegalArgumentException
  	// 如果沒有資料可以傳送 會拋 NoSuchElementException
    @SuppressWarnings("unchecked")
    final Subscription s = ((Observable<T>)that).single().subscribe(new Subscriber<T>() {
				// single()返回的Observable就可以對其進行標準的處理了
        @Override
        public void onCompleted() {
            finished.countDown();
        }

        @Override
        public void onError(Throwable e) {
            error.compareAndSet(null,e);
            finished.countDown();
        }

        @Override
        public void onNext(T v) {
            // "single" guarantees there is only one "onNext"
            value.set(v);
        }
    });
		
  	// 最後將Subscription返回的資料封裝成Future,實現對應的邏輯
    return new Future<T>() {
			// 可以檢視原始碼
    };

}
複製程式碼

AbstractCommand.java

AbstractCommandtoObservable實現的地方,屬於Hystrix的核心邏輯,程式碼較長,可以和方法呼叫的流程圖一起食用.toObservable主要是完成快取和建立Observable,requestLog的邏輯,當第一次建立Observable時,applyHystrixSemantics方法是Hystrix的語義實現,可以跳著看.

tips: 下文中有很多 Action和 Function,他們很相似,都有call方法,但是區別在於Function有返回值,而Action沒有,方法後跟著的數字代表有幾個入參.Func0/Func3即沒有入參和有三個入參

toObservable

toObservable程式碼較長且分層還是清晰的,所以下面一塊一塊寫.其邏輯和文章開始提到的Hystrix流程圖是完全一致的.

toObservable.png

public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;
  	// 此處省略掉了很多個Action和Function,大部分是來做掃尾清理的函式,所以用到的時候再說
  
  	// defer在上篇rxjava入門中提到過,是一種建立型的操作符,每次訂閱時會產生新的Observable,回撥方法中所實現的才是真正我們需要的Observable
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
          	
						// 校驗命令的狀態,保證其只執行一次
            if (!commandState.compareAndSet(CommandState.NOT_STARTED,CommandState.OBSERVABLE_CHAIN_CREATED)) {
                IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                //TODO make a new error type for this
                throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION,_cmd.getClass(),getLogMessagePrefix() + " command executed multiple times - this is not permitted.",ex,null);
            }

            commandStartTimestamp = System.currentTimeMillis();
						// properties為當前command的所有屬性
          	// 允許記錄請求log時會儲存當前執行的command
            if (properties.requestLogEnabled().get()) {
                // log this command execution regardless of what happened
                if (currentRequestLog != null) {
                    currentRequestLog.addExecutedCommand(_cmd);
                }
            }
						
          	// 是否開啟了請求快取
            final boolean requestCacheEnabled = isRequestCachingEnabled();
          	// 獲取快取key
            final String cacheKey = getCacheKey();

            // 開啟快取後,嘗試從快取中取
            if (requestCacheEnabled) {
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                if (fromCache != null) {
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache,_cmd);
                }
            }
          	// 沒有開啟請求快取時,就執行正常的邏輯
            Observable<R> hystrixObservable =
              			// 這裡又通過defer建立了我們需要的Observable
                    Observable.defer(applyHystrixSemantics)
              							// 傳送前會先走一遍hook,預設executionHook是空實現的,所以這裡就跳過了
                            .map(wrapWithAllOnNextHooks);
          
            // 得到最後的封裝好的Observable後,將其放入快取
            if (requestCacheEnabled && cacheKey != null) {
                // wrap it for caching
                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable,_cmd);
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey,toCache);
                if (fromCache != null) {
                    // another thread beat us so we'll use the cached value instead
                    toCache.unsubscribe();
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache,_cmd);
                } else {
                    // we just created an ObservableCommand so we cast and return it
                    afterCache = toCache.toObservable();
                }
            } else {
                afterCache = hystrixObservable;
            }

            return afterCache
              			// 終止時的操作
                    .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line),or unsubscribe (next line))
              			// 取消訂閱時的操作
                    .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
              			// 完成時的操作
                    .doOnCompleted(fireOnCompletedHook);
        }
    }
                     
複製程式碼
handleRequestCacheHitAndEmitValues

快取擊中時的處理

private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache,final AbstractCommand<R> _cmd) {
        try {
          	// Hystrix中有大量的hook 如果有心做二次開發的,可以利用這些hook做到很完善的監控
            executionHook.onCacheHit(this);
        } catch (Throwable hookEx) {
            logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit",hookEx);
        }   
  // 將快取的結果賦給當前command
	return fromCache.toObservableWithStateCopiedInto(this)
    				// doOnTerminate 或者是後面看到的doOnUnsubscribe,doOnError,都指的是在響應onTerminate/onUnsubscribe/onError後的操作,即在Observable的生命週期上註冊一個動作優雅的處理邏輯
            .doOnTerminate(new Action0() {
                @Override
                public void call() {
                  	// 命令最終狀態的不同進行不同處理
                    if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED,CommandState.TERMINAL)) {
                        cleanUpAfterResponseFromCache(false); //user code never ran
                    } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED,CommandState.TERMINAL)) {
                        cleanUpAfterResponseFromCache(true); //user code did run
                    }
                }
            })
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
	                  // 命令最終狀態的不同進行不同處理
                    if (commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED,CommandState.UNSUBSCRIBED)) {
                        cleanUpAfterResponseFromCache(false); //user code never ran
                    } else if (commandState.compareAndSet(CommandState.USER_CODE_EXECUTED,CommandState.UNSUBSCRIBED)) {
                        cleanUpAfterResponseFromCache(true); //user code did run
                    }
                }
            });
}       
複製程式碼
applyHystrixSemantics

因為本片文章的主要目的是在講執行流程,所以失敗回退和斷路器相關的就留到以後的文章中再寫.

applyHystrixSemantics.png

final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
    @Override
    public Observable<R> call() {
      	// 不再訂閱了就返回不傳送資料的Observable
        if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
          	// 不傳送任何資料或通知
            return Observable.never();
        }
        return applyHystrixSemantics(_cmd);
    }
};

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
	// 標記開始執行的hook
  // 如果hook內拋異常了,會快速失敗且沒有fallback處理
  executionHook.onStart(_cmd);

  /* determine if we're allowed to execute */
  // 斷路器核心邏輯: 判斷是否允許執行(TODO)
  if (circuitBreaker.allowRequest()) {
    // Hystrix自己造的訊號量輪子,之所以不用juc下,官方解釋為juc的Semphore實現太複雜,而且沒有動態調節的訊號量大小的能力,簡而言之,不滿足需求!
    // 根據不同隔離策略(執行緒池隔離/訊號量隔離)獲取不同的TryableSemphore
    final TryableSemaphore executionSemaphore = getExecutionSemaphore();
    // Semaphore釋放標誌
    final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
    
    // 釋放訊號量的Action
    final Action0 singleSemaphoreRelease = new Action0() {
      @Override
      public void call() {
        if (semaphoreHasBeenReleased.compareAndSet(false,true)) {
          executionSemaphore.release();
        }
      }
    };

    // 異常處理
    final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
      @Override
      public void call(Throwable t) {
        // HystrixEventNotifier是hystrix的外掛,不同的事件傳送不同的通知,預設是空實現.
        eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN,commandKey);
      }
    };
		
    // 執行緒池隔離的TryableSemphore始終為true
    if (executionSemaphore.tryAcquire()) {
      try {
        /* used to track userThreadExecutionTime */
        // executionResult是一次命令執行的結果資訊封裝
        // 這裡設定起始時間是為了記錄命令的生命週期,執行過程中會set其他屬性進去
        executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
        return executeCommandAndObserve(_cmd)
          // 報錯時的處理
          .doOnError(markExceptionThrown)
          // 終止時釋放
          .doOnTerminate(singleSemaphoreRelease)
          // 取消訂閱時釋放
          .doOnUnsubscribe(singleSemaphoreRelease);
      } catch (RuntimeException e) {
        return Observable.error(e);
      }
    } else {
      // tryAcquire失敗後會做fallback處理,TODO
      return handleSemaphoreRejectionViaFallback();
    }
  } else {
    // 斷路器短路(拒絕請求)fallback處理 TODO
    return handleShortCircuitViaFallback();
  }
}

複製程式碼
executeCommandAndObserve

executeCommandAndObserve.png

/**
 * 執行run方法的地方
 */
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
  	// 獲取當前上下文
    final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

  	// 傳送資料時的Action響應
    final Action1<R> markEmits = new Action1<R>() {
        @Override
        public void call(R r) {
          	// 如果onNext時需要上報時,做以下處理
            if (shouldOutputOnNextEvents()) {
              	// result標記
                executionResult = executionResult.addEvent(HystrixEventType.EMIT);
              	// 通知
                eventNotifier.markEvent(HystrixEventType.EMIT,commandKey);
            }
          	// commandIsScalar是一個我不解的地方,在網上也沒有查到好的解釋
          	// 該方法為抽象方法,有HystrixCommand實現返回true.HystrixObservableCommand返回false
            if (commandIsScalar()) {
              	// 耗時
                long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
              	// 通知
                eventNotifier.markCommandExecution(getCommandKey(),properties.executionIsolationStrategy().get(),(int) latency,executionResult.getOrderedList());
                eventNotifier.markEvent(HystrixEventType.SUCCESS,commandKey);
                executionResult = executionResult.addEvent((int) latency,HystrixEventType.SUCCESS);
              	// 斷路器標記成功(斷路器半開時的反饋,決定是否關閉斷路器)
                circuitBreaker.markSuccess();
            }
        }
    };

    final Action0 markOnCompleted = new Action0() {
        @Override
        public void call() {
            if (!commandIsScalar()) {
							// 同markEmits 類似處理
            }
        }
    };

  	// 失敗回退的邏輯
    final Func1<Throwable,Observable<R>> handleFallback = new Func1<Throwable,Observable<R>>() {
        @Override
        public Observable<R> call(Throwable t) {
          // 不是重點略過了
        }
    };

  	// 請求上下文的處理
    final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
        @Override
        public void call(Notification<? super R> rNotification) {
            setRequestContextIfNeeded(currentRequestContext);
        }
    };

    Observable<R> execution;
  	// 如果有執行超時限制,會將包裝後的Observable再轉變為支援TimeOut的
    if (properties.executionTimeoutEnabled().get()) {
      	// 根據不同的隔離策略包裝為不同的Observable
        execution = executeCommandWithSpecifiedIsolation(_cmd)
          			// lift 是rxjava中一種基本操作符 可以將Observable轉換成另一種Observable
          			// 包裝為帶有超時限制的Observable
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

複製程式碼
executeCommandWithSpecifiedIsolation

根據不同的隔離策略建立不同的執行Observable

executeCommandSpecfi.png

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
              	// 由於原始碼太長,這裡只關注正常的流程,需要詳細瞭解可以去看看原始碼
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD,ThreadState.STARTED)) {
                    try {
                        return getUserExecutionObservable(_cmd);
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                } else {
                    //command has already been unsubscribed,so return immediately
                    return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                }
            }})
        .doOnTerminate(new Action0() {})
        .doOnUnsubscribe(new Action0() {})
        // 指定在某一個執行緒上執行,是rxjava中很重要的執行緒排程的概念
        .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
        }));
    } else { // 訊號量隔離策略
        return Observable.defer(new Func0<Observable<R>>() {
						// 邏輯與執行緒池大致相同
        });
    }
}

複製程式碼
getUserExecutionObservable

獲取使用者執行的邏輯

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    Observable<R> userObservable;

    try {
      	// getExecutionObservable是抽象方法,有HystrixCommand自行實現
        userObservable = getExecutionObservable();
    } catch (Throwable ex) {
        // the run() method is a user provided implementation so can throw instead of using Observable.onError
        // so we catch it here and turn it into Observable.error
        userObservable = Observable.error(ex);
    }
		// 將Observable作其他中轉
    return userObservable
            .lift(new ExecutionHookApplication(_cmd))
            .lift(new DeprecatedOnRunHookApplication(_cmd));
}

複製程式碼

lift操作符

lift可以轉換成一個新的Observable,它很像一個代理,將原來的Observable代理到自己這裡,訂閱時通知原來的Observable傳送資料,經自己這裡流轉加工處理再返回給訂閱者.Map/FlatMap操作符底層其實就是用的lift進行實現的.

getExecutionObservable
@Override
final protected Observable<R> getExecutionObservable() {
  return Observable.defer(new Func0<Observable<R>>() {
    @Override
    public Observable<R> call() {
      try {
        // just操作符就是直接執行的Observable
        // run方法就是我們實現的業務邏輯: Hello World~
        return Observable.just(run());
      } catch (Throwable ex) {
        return Observable.error(ex);
      }
    }
  }).doOnSubscribe(new Action0() {
    @Override
    public void call() {
     	// 執行訂閱時將執行執行緒記為當前執行緒,必要時我們可以interrupt
      executionThread.set(Thread.currentThread());
    }
  });
}

複製程式碼

總結

希望自己能把埋下的坑一一填完: 容錯機制,metrics,斷路器等等...

參考

  1. Hystrix How it Works
  2. ReactiveX官網
  3. 阮一峰: 中文技術檔案寫作規範
  4. RxJava lift 原理解析