1. 程式人生 > 其它 >(十五)soul的Resilience4j體驗和原理(下)

(十五)soul的Resilience4j體驗和原理(下)

技術標籤:souljava

(十五)soul的Resilience4j體驗和原理(下)

目標

  • Resilience4JPlugin核心原始碼解讀

Resilience4JPlugin核心原始碼

1: doExecute:、從上下文中拿到配置資訊,
2:Resilience4JHandle是Resilience4J的引數物件類,根據rule值,json轉化而來,即拿到我們在admin控制檯配置的引數
3:circuitEnable:是否開啟熔斷,配置0:關閉,直接走 rateLimiter方法:處理過程中存在依次則會執行fallback,沒有配置fallback,則直接判處異常
4:circuitEnable配置 1:開啟,則走combined方法

   @Override
   protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
       final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
       assert soulContext != null;
       Resilience4JHandle resilience4JHandle = GsonUtils.getGson().fromJson(rule.getHandle(), Resilience4JHandle.class);
       if (resilience4JHandle.getCircuitEnable() == 1) {
           return combined(exchange, chain, rule);
       }
       return rateLimiter(exchange, chain, rule);
   }

   private Mono<Void> rateLimiter(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) {
       return ratelimiterExecutor.run(
               chain.execute(exchange), fallback(ratelimiterExecutor, exchange, null), Resilience4JBuilder.build(rule))
               .onErrorResume(throwable -> ratelimiterExecutor.withoutFallback(exchange, throwable));
   }

  • Resilience4JPlugin#combined解析
   private Mono<Void> combined(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) {
       Resilience4JConf conf = Resilience4JBuilder.build(rule);
       return combinedExecutor.run(
               chain.execute(exchange).doOnSuccess(v -> {
                   if (exchange.getResponse().getStatusCode() != HttpStatus.OK) {
                       HttpStatus status = exchange.getResponse().getStatusCode();
                       exchange.getResponse().setStatusCode(null);
                       throw new CircuitBreakerStatusCodeException(status);
                   }
               }), fallback(combinedExecutor, exchange, conf.getFallBackUri()), conf);
   }

1: Resilience4JConf conf = Resilience4JBuilder.build(rule):根據配置構建CircuitBreakerConfig(斷路器) TimeLimiterConfig(限時) RateLimiterConfig(限流),在soul-plugin-resilience4j#pom.xml
有新增相關依賴

        <dependency>
            <groupId>io.github.resilience4j</groupId>
            <artifactId>resilience4j-circuitbreaker</artifactId>
            <version>${resilience.version}</version>
        </dependency>
        <dependency>
            <groupId>io.github.resilience4j</groupId>
            <artifactId>resilience4j-timelimiter</artifactId>
            <version>${resilience.version}</version>
        </dependency>
        <dependency>
            <groupId>io.github.resilience4j</groupId>
            <artifactId>resilience4j-ratelimiter</artifactId>
            <version>${resilience.version}</version>
        </dependency>
  • CombinedExecutor#run,run是Resilience4外掛的核心處理邏輯,在這裡完成對soul的增強

1:RateLimiter和CircuitBreaker物件都有Resilience4JRegistryFactory工廠建立而來,Resilience4JRegistryFactory維護了兩個屬性RateLimiterRegistry和CircuitBreakerRegistry,
其中一個基於ConcurrentHashMap 的 CircuitBreakerRegistry ,CircuitBreakerRegistry 是執行緒安全的,並且是原子操作。開發者可以使用 CircuitBreakerRegistry 來建立和檢索 CircuitBreaker 的例項,RateLimiterRegistry也是類似
2: run.transformDeferred:是響應式程式設計 Reactor轉換的使用
3:依次斷路器的操作,限流操作,設定超時時間,如果超時了丟擲超時異常,

    public <T> Mono<T> run(final Mono<T> run, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf resilience4JConf) {
        RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(resilience4JConf.getId(), resilience4JConf.getRateLimiterConfig());
        CircuitBreaker circuitBreaker = Resilience4JRegistryFactory.circuitBreaker(resilience4JConf.getId(), resilience4JConf.getCircuitBreakerConfig());
       //斷路器的操作
        Mono<T> to = run.transformDeferred(CircuitBreakerOperator.of(circuitBreaker))
                //限流操作
                .transformDeferred(RateLimiterOperator.of(rateLimiter))
                //設定超時時間
                .timeout(resilience4JConf.getTimeLimiterConfig().getTimeoutDuration())
                //如果超時了丟擲超時異常
                .doOnError(TimeoutException.class, t -> circuitBreaker.onError(
                        resilience4JConf.getTimeLimiterConfig().getTimeoutDuration().toMillis(),
                        TimeUnit.MILLISECONDS,
                        t));
        if (fallback != null) {
            to = to.onErrorResume(fallback);
        }
        return to;
    }

總結