hystrix總結之緩存
阿新 • • 發佈:2018-01-11
rate leg fun xca instance aps man sta stand
通過實現HystrixCommand或者HystrixObservableCommand的getCacheKey方法,可以啟動緩存。
public class CommandUsingRequestCache extends HystrixCommand<Boolean> { private final int value; protected CommandUsingRequestCache(int value) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));this.value = value; } @Override protected Boolean run() { return value == 0 || value % 2 == 0; } @Override protected String getCacheKey() { return String.valueOf(value); } }
執行命令
@Test public void testWithoutCacheHits() { HystrixRequestContext context= HystrixRequestContext.initializeContext(); try { assertTrue(new CommandUsingRequestCache(2).execute()); assertFalse(new CommandUsingRequestCache(1).execute()); assertTrue(new CommandUsingRequestCache(0).execute()); assertTrue(new CommandUsingRequestCache(58672).execute()); } finally { context.shutdown(); } }
Hystrix通過getCacheKey方法來獲取緩存中的值,緩存值的生命周期為一個請求。
本質上,在toObservable方法中,在執行前添加了從緩存中獲取緩存的邏輯,在執行後,將返回結果存入緩存的邏輯。
public Observable<R> toObservable() { ... return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { ... final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; if (requestCacheEnabled && cacheKey != null) { HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); ... } else { afterCache = hystrixObservable; } ... } }); }
HystrixCachedObservable內部使用type(命令類型:HystrixCommandKey為1,HystrixCollapserKey為2),commandKey的name,HystrixConcurrencyStrategy作為緩存的key
private static class RequestCacheKey { private final short type; private final String key; private final HystrixConcurrencyStrategy concurrencyStrategy; ... } private static class ValueCacheKey { private final RequestCacheKey rvKey; private final String valueCacheKey; ... }
HystrixCachedObservable的值存儲一個Observable,並且通過HystrixCachedObservable進行封裝。
protected HystrixCachedObservable(final Observable<R> originalObservable) { ReplaySubject<R> replaySubject = ReplaySubject.create(); this.originalSubscription = originalObservable .subscribe(replaySubject); this.cachedObservable = replaySubject .doOnUnsubscribe(new Action0() { @Override public void call() { outstandingSubscriptions--; if (outstandingSubscriptions == 0) { originalSubscription.unsubscribe(); } } }) .doOnSubscribe(new Action0() { @Override public void call() { outstandingSubscriptions++; } }); }
HystrixCachedObservable內部使用一個Map作為緩存, 這個Map存儲在一個HystrixRequestVariableHolder中,所以它的生命周期為一個請求內部。
private static final HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>> requestVariableForCache = new HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>(new HystrixRequestVariableLifecycle<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>();
<T> HystrixCachedObservable<T> get(String cacheKey) { ValueCacheKey key = getRequestCacheKey(cacheKey); if (key != null) { ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy); if (cacheInstance == null) { throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?"); } /* look for the stored value */ return (HystrixCachedObservable<T>) cacheInstance.get(key); } return null; }
如果異常 是否也緩存?
是的,fallback的異常也會被緩存下來。
緩存的有效範圍?
為同一個請求內。
hystrix總結之緩存