HystrixCircuitBreakerImpl熔斷器之滑動視窗
阿新 • • 發佈:2018-12-12
這裡主要是介紹熔斷器在關閉狀態下內部是如何不斷的檢查命令的執行結果來對應更改狀態,其實就是利用Rxjava實現了一個滑動視窗,下面就在原始碼中看看是怎麼回事。
1、先還是再整體看看HystrixCircuitBreakerImpl
class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { private final HystrixCommandProperties properties; //配置:包括了滑動視窗的設定 private final HystrixCommandMetrics metrics; //命令執行記錄的資料流 enum Status { CLOSED, OPEN, HALF_OPEN; //熔斷器狀態,分別對應:關閉、開啟、半開 } private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED); private final AtomicLong circuitOpened = new AtomicLong(-1); private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null); protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur Subscription s = subscribeToStream(); activeSubscription.set(s); } private Subscription subscribeToStream() { /* * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream */ return metrics.getHealthCountsStream() .observe() // 滑動視窗在裡面,滑動視窗在裡面,滑動視窗在裡面!!! .subscribe(new Subscriber<HealthCounts>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(HealthCounts hc) { // 根據滑動窗口裡的結果更改熔斷器狀態 // check if we are past the statisticalWindowVolumeThreshold if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // we are not past the minimum volume threshold for the stat window, // so no change to circuit status. // if it was CLOSED, it stays CLOSED // if it was half-open, we need to wait for a successful command execution // if it was open, we need to wait for sleep window to elapse } else { if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { //we are not past the minimum error threshold for the stat window, // so no change to circuit status. // if it was CLOSED, it stays CLOSED // if it was half-open, we need to wait for a successful command execution // if it was open, we need to wait for sleep window to elapse } else { // our failure rate is too high, we need to set the state to OPEN if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); } } } } }); } @Override public void markSuccess() { // 這個是根據一個命令的實時執行結果,成功則將熔斷器由半開狀態變為關閉狀態 if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) { //This thread wins the race to close the circuit - it resets the stream to start it over from 0 metrics.resetStream(); Subscription previousSubscription = activeSubscription.get(); if (previousSubscription != null) { previousSubscription.unsubscribe(); } Subscription newSubscription = subscribeToStream(); activeSubscription.set(newSubscription); circuitOpened.set(-1L); } } @Override public void markNonSuccess() {// 這個是根據一個命令的實時執行結果,失敗則將熔斷器由半開狀態變為開啟狀態 if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) { //This thread wins the race to re-open the circuit - it resets the start time for the sleep window circuitOpened.set(System.currentTimeMillis()); } } @Override public boolean isOpen() { if (properties.circuitBreakerForceOpen().get()) { return true; } if (properties.circuitBreakerForceClosed().get()) { return false; } return circuitOpened.get() >= 0; } @Override public boolean allowRequest() { if (properties.circuitBreakerForceOpen().get()) { return false; } if (properties.circuitBreakerForceClosed().get()) { return true; } if (circuitOpened.get() == -1) { return true; } else { if (status.get().equals(Status.HALF_OPEN)) { return false; } else { return isAfterSleepWindow(); } } } private boolean isAfterSleepWindow() { // 判斷熔斷器是否可進入半開狀態 final long circuitOpenTime = circuitOpened.get(); final long currentTime = System.currentTimeMillis(); final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get(); return currentTime > circuitOpenTime + sleepWindowTime; } @Override public boolean attemptExecution() { if (properties.circuitBreakerForceOpen().get()) { return false; } if (properties.circuitBreakerForceClosed().get()) { return true; } if (circuitOpened.get() == -1) { return true; } else { if (isAfterSleepWindow()) { if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { //only the first request after sleep window should execute return true; } else { return false; } } else { return false; } } } }
進入observe(),可以看到是返回一個sourceStream,它是一個Observable。在當前類的建構函式中可以看到sourceStream又來源於bucketedStream.window(numBuckets, 1) .......
this.sourceStream = bucketedStream //stream broken up into buckets .window(numBuckets, 1) //滑動視窗,滑動視窗,滑動視窗 emit overlapping windows of buckets .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } }) .share() //multiple subscribers should get same data .onBackpressureDrop(); //if there are slow consumers, data should not buffer
進入bucketedStream所在類的建構函式中可以看到如下程式碼
this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() { @Override public Observable<Bucket> call() { return inputEventStream .observe() .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //滑動視窗,滑動視窗,滑動視窗 bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext .flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types .startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full) } });