1. 程式人生 > >hystrix總結之緩存

hystrix總結之緩存

rate leg fun xca instance aps man sta stand

  通過實現HystrixCommand或者HystrixObservableCommand的getCacheKey方法,可以啟動緩存。

public class CommandUsingRequestCache extends HystrixCommand<Boolean> {
    private final int value;
    protected CommandUsingRequestCache(int value) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        
this.value = value; } @Override protected Boolean run() { return value == 0 || value % 2 == 0; } @Override protected String getCacheKey() { return String.valueOf(value); } }

執行命令

@Test
        public void testWithoutCacheHits() {
            HystrixRequestContext context 
= HystrixRequestContext.initializeContext(); try { assertTrue(new CommandUsingRequestCache(2).execute()); assertFalse(new CommandUsingRequestCache(1).execute()); assertTrue(new CommandUsingRequestCache(0).execute()); assertTrue(
new CommandUsingRequestCache(58672).execute()); } finally { context.shutdown(); } }

  Hystrix通過getCacheKey方法來獲取緩存中的值,緩存值的生命周期為一個請求。

  本質上,在toObservable方法中,在執行前添加了從緩存中獲取緩存的邏輯,在執行後,將返回結果存入緩存的邏輯。

public Observable<R> toObservable() {
...
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
               ...
                final boolean requestCacheEnabled = isRequestCachingEnabled();
                final String cacheKey = getCacheKey();
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);
                Observable<R> afterCache;
                if (requestCacheEnabled && cacheKey != null) {
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    ...
                } else {
                    afterCache = hystrixObservable;
                }
...
            }
        });
    }

  HystrixCachedObservable內部使用type(命令類型:HystrixCommandKey為1,HystrixCollapserKey為2),commandKey的name,HystrixConcurrencyStrategy作為緩存的key  

private static class RequestCacheKey {
        private final short type; 
        private final String key;
        private final HystrixConcurrencyStrategy concurrencyStrategy;
...
}
 private static class ValueCacheKey {
        private final RequestCacheKey rvKey;
        private final String valueCacheKey;
...
}

  HystrixCachedObservable的值存儲一個Observable,並且通過HystrixCachedObservable進行封裝。

protected HystrixCachedObservable(final Observable<R> originalObservable) {
        ReplaySubject<R> replaySubject = ReplaySubject.create();
        this.originalSubscription = originalObservable
                .subscribe(replaySubject);

        this.cachedObservable = replaySubject
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        outstandingSubscriptions--;
                        if (outstandingSubscriptions == 0) {
                            originalSubscription.unsubscribe();
                        }
                    }
                })
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        outstandingSubscriptions++;
                    }
                });
    }

  HystrixCachedObservable內部使用一個Map作為緩存, 這個Map存儲在一個HystrixRequestVariableHolder中,所以它的生命周期為一個請求內部。

private static final HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>> requestVariableForCache = new HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>(new HystrixRequestVariableLifecycle<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>(); 
<T> HystrixCachedObservable<T> get(String cacheKey) {
        ValueCacheKey key = getRequestCacheKey(cacheKey);
        if (key != null) {
            ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy);
            if (cacheInstance == null) {
                throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?");
            }
            /* look for the stored value */
            return (HystrixCachedObservable<T>) cacheInstance.get(key);
        }
        return null;
    }

如果異常 是否也緩存?

  是的,fallback的異常也會被緩存下來。

緩存的有效範圍?

  為同一個請求內。

hystrix總結之緩存