(十五)soul的Resilience4j體驗和原理(下)
(十五)soul的Resilience4j體驗和原理(下)
目標
- Resilience4JPlugin核心原始碼解讀
Resilience4JPlugin核心原始碼
1: doExecute:、從上下文中拿到配置資訊,
2:Resilience4JHandle是Resilience4J的引數物件類,根據rule值,json轉化而來,即拿到我們在admin控制檯配置的引數
3:circuitEnable:是否開啟熔斷,配置0:關閉,直接走 rateLimiter方法:處理過程中存在依次則會執行fallback,沒有配置fallback,則直接判處異常
4:circuitEnable配置 1:開啟,則走combined方法
@Override protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) { final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT); assert soulContext != null; Resilience4JHandle resilience4JHandle = GsonUtils.getGson().fromJson(rule.getHandle(), Resilience4JHandle.class); if (resilience4JHandle.getCircuitEnable() == 1) { return combined(exchange, chain, rule); } return rateLimiter(exchange, chain, rule); } private Mono<Void> rateLimiter(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) { return ratelimiterExecutor.run( chain.execute(exchange), fallback(ratelimiterExecutor, exchange, null), Resilience4JBuilder.build(rule)) .onErrorResume(throwable -> ratelimiterExecutor.withoutFallback(exchange, throwable)); }
- Resilience4JPlugin#combined解析
private Mono<Void> combined(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) { Resilience4JConf conf = Resilience4JBuilder.build(rule); return combinedExecutor.run( chain.execute(exchange).doOnSuccess(v -> { if (exchange.getResponse().getStatusCode() != HttpStatus.OK) { HttpStatus status = exchange.getResponse().getStatusCode(); exchange.getResponse().setStatusCode(null); throw new CircuitBreakerStatusCodeException(status); } }), fallback(combinedExecutor, exchange, conf.getFallBackUri()), conf); }
1: Resilience4JConf conf = Resilience4JBuilder.build(rule):根據配置構建CircuitBreakerConfig(斷路器) TimeLimiterConfig(限時) RateLimiterConfig(限流),在soul-plugin-resilience4j#pom.xml
有新增相關依賴
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>${resilience.version}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-timelimiter</artifactId>
<version>${resilience.version}</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-ratelimiter</artifactId>
<version>${resilience.version}</version>
</dependency>
- CombinedExecutor#run,run是Resilience4外掛的核心處理邏輯,在這裡完成對soul的增強
1:RateLimiter和CircuitBreaker物件都有Resilience4JRegistryFactory工廠建立而來,Resilience4JRegistryFactory維護了兩個屬性RateLimiterRegistry和CircuitBreakerRegistry,
其中一個基於ConcurrentHashMap 的 CircuitBreakerRegistry ,CircuitBreakerRegistry 是執行緒安全的,並且是原子操作。開發者可以使用 CircuitBreakerRegistry 來建立和檢索 CircuitBreaker 的例項,RateLimiterRegistry也是類似
2: run.transformDeferred:是響應式程式設計 Reactor轉換的使用
3:依次斷路器的操作,限流操作,設定超時時間,如果超時了丟擲超時異常,
public <T> Mono<T> run(final Mono<T> run, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf resilience4JConf) {
RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(resilience4JConf.getId(), resilience4JConf.getRateLimiterConfig());
CircuitBreaker circuitBreaker = Resilience4JRegistryFactory.circuitBreaker(resilience4JConf.getId(), resilience4JConf.getCircuitBreakerConfig());
//斷路器的操作
Mono<T> to = run.transformDeferred(CircuitBreakerOperator.of(circuitBreaker))
//限流操作
.transformDeferred(RateLimiterOperator.of(rateLimiter))
//設定超時時間
.timeout(resilience4JConf.getTimeLimiterConfig().getTimeoutDuration())
//如果超時了丟擲超時異常
.doOnError(TimeoutException.class, t -> circuitBreaker.onError(
resilience4JConf.getTimeLimiterConfig().getTimeoutDuration().toMillis(),
TimeUnit.MILLISECONDS,
t));
if (fallback != null) {
to = to.onErrorResume(fallback);
}
return to;
}