class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
	private final HystrixCommandProperties properties; //配置:包括了滑動視窗的設定
	private final HystrixCommandMetrics metrics; //命令執行記錄的資料流

	enum Status {
		CLOSED, OPEN, HALF_OPEN; //熔斷器狀態,分別對應:關閉、開啟、半開

	private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
	private final AtomicLong circuitOpened = new AtomicLong(-1);
	private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

	protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
		this.properties = properties;
		this.metrics = metrics;

		//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
		Subscription s = subscribeToStream();

	private Subscription subscribeToStream() {
		 * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
		return metrics.getHealthCountsStream()
				.observe() // 滑動視窗在裡面,滑動視窗在裡面,滑動視窗在裡面!!!
				.subscribe(new Subscriber<HealthCounts>() {
					public void onCompleted() {


					public void onError(Throwable e) {


					public void onNext(HealthCounts hc) { // 根據滑動窗口裡的結果更改熔斷器狀態
						// check if we are past the statisticalWindowVolumeThreshold
						if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
							// we are not past the minimum volume threshold for the stat window,
							// so no change to circuit status.
							// if it was CLOSED, it stays CLOSED
							// if it was half-open, we need to wait for a successful command execution
							// if it was open, we need to wait for sleep window to elapse
						} else {
							if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
								//we are not past the minimum error threshold for the stat window,
								// so no change to circuit status.
								// if it was CLOSED, it stays CLOSED
								// if it was half-open, we need to wait for a successful command execution
								// if it was open, we need to wait for sleep window to elapse
							} else {
								// our failure rate is too high, we need to set the state to OPEN
								if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {

	public void markSuccess() { // 這個是根據一個命令的實時執行結果,成功則將熔斷器由半開狀態變為關閉狀態
		if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
			//This thread wins the race to close the circuit - it resets the stream to start it over from 0
			Subscription previousSubscription = activeSubscription.get();
			if (previousSubscription != null) {
			Subscription newSubscription = subscribeToStream();

	public void markNonSuccess() {// 這個是根據一個命令的實時執行結果,失敗則將熔斷器由半開狀態變為開啟狀態
		if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
			//This thread wins the race to re-open the circuit - it resets the start time for the sleep window

	public boolean isOpen() {
		if (properties.circuitBreakerForceOpen().get()) {
			return true;
		if (properties.circuitBreakerForceClosed().get()) {
			return false;
		return circuitOpened.get() >= 0;

	public boolean allowRequest() {
		if (properties.circuitBreakerForceOpen().get()) {
			return false;
		if (properties.circuitBreakerForceClosed().get()) {
			return true;
		if (circuitOpened.get() == -1) {
			return true;
		} else {
			if (status.get().equals(Status.HALF_OPEN)) {
				return false;
			} else {
				return isAfterSleepWindow();

	private boolean isAfterSleepWindow() { // 判斷熔斷器是否可進入半開狀態
		final long circuitOpenTime = circuitOpened.get();
		final long currentTime = System.currentTimeMillis();
		final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
		return currentTime > circuitOpenTime + sleepWindowTime;

	public boolean attemptExecution() {
		if (properties.circuitBreakerForceOpen().get()) {
			return false;
		if (properties.circuitBreakerForceClosed().get()) {
			return true;
		if (circuitOpened.get() == -1) {
			return true;
		} else {
			if (isAfterSleepWindow()) {
				if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
					//only the first request after sleep window should execute
					return true;
				} else {
					return false;
			} else {
				return false;

        進入observe(),可以看到是返回一個sourceStream,它是一個Observable。在當前類的建構函式中可以看到sourceStream又來源於bucketedStream.window(numBuckets, 1) .......

 this.sourceStream = bucketedStream      //stream broken up into buckets
                .window(numBuckets, 1)          //滑動視窗,滑動視窗,滑動視窗  emit overlapping windows of buckets
                .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
                .doOnSubscribe(new Action0() {
                    public void call() {
                .doOnUnsubscribe(new Action0() {
                    public void call() {
                .share()                        //multiple subscribers should get same data
                .onBackpressureDrop();          //if there are slow consumers, data should not buffer


this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
	public Observable<Bucket> call() {
		return inputEventStream
				.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //滑動視窗,滑動視窗,滑動視窗 bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
				.flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
				.startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)