1. 程式人生 > 實用技巧 >Soul閘道器原始碼閱讀(七)限流外掛初探

Soul閘道器原始碼閱讀(七)限流外掛初探

Soul閘道器原始碼閱讀(七)限流外掛初探


簡介

    前面的文章中對處理流程探索的差不多了,今天來探索下限流外掛:resilience4j

示例執行

環境配置

    啟動下MySQL和redis

docker run -dit --name redis -p 6379:6379 redis
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest

Soul-Admin啟動及相關配置

    執行Soul-admin,進入管理介面:系統管理 --> 外掛管理 --> resilience4j ,點選編輯,把它開啟

    進入管理介面的外掛列表:resilience4j 新增選擇器和規則,這裡安裝divide外掛的匹配方式配的,讓divide的/http字首的介面都走限流(因為使用測試時官方本身自帶的HTTP測試)

    規則配置中:token filling number 要設定大於0,不然會報錯

    circuit enable 要設定為0,判斷的時候走限流的邏輯

    其他的:fallback uri 隨便填個路徑,其他的引數都可填1

Soul-Bootstrap配置啟動

    在Soul-Bootstrap中進入相關的依賴,大致如下:

<!-- soul resilience4j plugin start-->
  <dependency>
      <groupId>org.dromara</groupId>
      <artifactId>soul-spring-boot-starter-plugin-resilience4j</artifactId>
       <version>${last.version}</version>
  </dependency>
  <!-- soul resilience4j plugin end-->

    啟動Soul-Bootstrap

HTTP示例啟動

    啟動:soul-examples --> soul-examples-http --> SoulTestHttpApplication

    進入管理介面的:外掛列表 --> divide 能看到相關的註冊介面資訊

    訪問: http://127.0.0.1:9195/http/order/findById?id=1111

    成功執行,下面開始原始碼debug

{
    "id": "1111",
    "name": "hello world findById"
}

原始碼Debug

限流流程順序跟蹤確認

    根據前面的文章,對處理流程基本上有個清晰的認識了,我們通過前面的除錯,知道 RateLimiterPlugin 是繼承 AbstractSoulPlugin ,那它就會走和路由匹配相關的邏輯,如下面對程式碼所示。匹配成功後才走 doExcute 限流邏輯

    # AbstractSoulPlugin
    // 首先進行路由匹配
    public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        String pluginName = named();
        final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
        if (pluginData != null && pluginData.getEnabled()) {
            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
            if (CollectionUtils.isEmpty(selectors)) {
                return CheckUtils.checkSelector(pluginName, exchange, chain);
            }
            final SelectorData selectorData = matchSelector(exchange, selectors);
            if (Objects.isNull(selectorData)) {
                if (PluginEnum.WAF.getName().equals(pluginName)) {
                    return doExecute(exchange, chain, null, null);
                }
                return CheckUtils.checkSelector(pluginName, exchange, chain);
            }
            if (selectorData.getLoged()) {
                log.info("{} selector success match , selector name :{}", pluginName, selectorData.getName());
            }
            final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
            if (CollectionUtils.isEmpty(rules)) {
                if (PluginEnum.WAF.getName().equals(pluginName)) {
                    return doExecute(exchange, chain, null, null);
                }
                return CheckUtils.checkRule(pluginName, exchange, chain);
            }
            RuleData rule;
            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
                //get last
                rule = rules.get(rules.size() - 1);
            } else {
                rule = matchRule(exchange, rules);
            }
            if (Objects.isNull(rule)) {
                return CheckUtils.checkRule(pluginName, exchange, chain);
            }
            if (rule.getLoged()) {
                log.info("{} rule success match ,rule name :{}", pluginName, rule.getName());
            }
            return doExecute(exchange, chain, selectorData, rule);
        }
        return chain.execute(exchange);
    }

    # RateLimiterPlugin
    // 匹配完成後走限流的邏輯
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
        // 這裡更加字串轉成物件,所有規則哪裡不能亂填
        Resilience4JHandle resilience4JHandle = GsonUtils.getGson().fromJson(rule.getHandle(), Resilience4JHandle.class);
        // 這裡判斷 Circle enable 是否為1 走 combined的邏輯,但我們這次想走 limit 的邏輯,所以要填0
        if (resilience4JHandle.getCircuitEnable() == 1) {
            return combined(exchange, chain, rule);
        }
        return rateLimiter(exchange, chain, rule);
    }

    // 到這有些複雜,看的不是太懂,只能繼續跟下去
    private Mono<Void> rateLimiter(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) {
        return ratelimiterExecutor.run(
                chain.execute(exchange), fallback(ratelimiterExecutor, exchange, null), Resilience4JBuilder.build(rule))
                .onErrorResume(throwable -> ratelimiterExecutor.withoutFallback(exchange, throwable));
    }

    plugin前面程式碼還是看的懂,但rateLimiter開始就有些迷糊,流式程式設計的知識用上都看不懂了,但大致知道是進行限流邏輯

public class RateLimiterExecutor implements Executor {

    @Override
    public <T> Mono<T> run(final Mono<T> toRun, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf conf) {
        // 生成限流器
        RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(conf.getId(), conf.getRateLimiterConfig());
        // 應該是在這觸發的限流邏輯
        Mono<T> to = toRun.transformDeferred(RateLimiterOperator.of(rateLimiter));
        if (fallback != null) {
            return to.onErrorResume(fallback);
        }
        return to;
    }
}

    繼續跟到上面那個類,我們看到了明顯的生成限流器的邏輯,但有個讓疑惑的是,因為返回的Mono,但沒有看到明顯的限流觸發邏輯。在沒有響應式程式設計的基礎的時候感覺很懵,目前也沒去定位真正的觸發程式碼是在哪?但猜測是在上面註釋中標註的那段觸發的

    因為響應式,沒有辦法跟下去了,我們只能另找路徑,看看具體的限流邏輯是什麼樣的

    通過上面知道:RateLimiter 是限流器,我們檢視它的具體實現

    發現是一個介面,我們看看它有哪些實現,發現有兩個: SemaphoreBasedRateLimiter 和 AtomicRateLimiter

    因為不知道用的哪個,我們在這兩個類中可能會執行的函式都給打上斷點

    重啟發送請求,不斷的跳斷點,終於進入了一個限流器的類: AtomicRateLimiter ,大致如下

    # AtomicRateLimiter
    public long reservePermission(final int permits) {
        long timeoutInNanos = ((AtomicRateLimiter.State)this.state.get()).config.getTimeoutDuration().toNanos();
        AtomicRateLimiter.State modifiedState = this.updateStateWithBackOff(permits, timeoutInNanos);
        boolean canAcquireImmediately = modifiedState.nanosToWait <= 0L;
        if (canAcquireImmediately) {
            this.publishRateLimiterEvent(true, permits);
            return 0L;
        } else {
            boolean canAcquireInTime = timeoutInNanos >= modifiedState.nanosToWait;
            if (canAcquireInTime) {
                this.publishRateLimiterEvent(true, permits);
                return modifiedState.nanosToWait;
            } else {
                this.publishRateLimiterEvent(false, permits);
                return -1L;
            }
        }
    }

    具體實現邏輯,不是我們此次關注的目的,此次是想看它在plugin中處理的流程順序如何

    和前面幾篇一樣,我們在: SoulWebHandler 打上斷點,看看限流器的執行順序是什麼樣的

    通過debug,我們發現順序和我們預期的基本一致:在進入 RateLimiterPlugin 外掛執行的時候,執行的斷點也到了限流器(AtomicRateLimiter),等限流器邏輯執行完畢,divide等外掛才開始執行

關於執行處罰和Mono的一些思考

    我們看一下下面限流執行的程式碼:

public class RateLimiterExecutor implements Executor {

    @Override
    public <T> Mono<T> run(final Mono<T> toRun, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf conf) {
        // 生成限流器
        RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(conf.getId(), conf.getRateLimiterConfig());
        // 應該是在這觸發的限流邏輯
        Mono<T> to = toRun.transformDeferred(RateLimiterOperator.of(rateLimiter));
        if (fallback != null) {
            return to.onErrorResume(fallback);
        }
        return to;
    }
}

    返回的一個Mono

    我們再看看divide之類的,也是返回的Mono

public class DividePlugin extends AbstractSoulPlugin {

    @Override
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
        final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
        final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
        if (CollectionUtils.isEmpty(upstreamList)) {
            log.error("divide upstream configuration error: {}", rule.toString());
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
        DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
        if (Objects.isNull(divideUpstream)) {
            log.error("divide has no upstream");
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        // set the http url
        String domain = buildDomain(divideUpstream);
        String realURL = buildRealURL(domain, soulContext, exchange);
        exchange.getAttributes().put(Constants.HTTP_URL, realURL);
        // set the http timeout
        exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
        exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
        return chain.execute(exchange);
    }
}

    再看看我們非常熟悉: SoulWebHandler

        public Mono<Void> execute(final ServerWebExchange exchange) {
            return Mono.defer(() -> {
                if (this.index < plugins.size()) {
                    SoulPlugin plugin = plugins.get(this.index++);
                    Boolean skip = plugin.skip(exchange);
                    if (skip) {
                        return this.execute(exchange);
                    }
                    return plugin.execute(exchange, this);
                }
                return Mono.empty();
            });
        }

    在上面函式中,通過英文,可以看到所有的Plugin都是返回一個Mono

    我們結合響應式程式設計的相關概念:釋出訂閱。也就是說,這些plugin Mono 會發布到一個佇列中,當訂閱的時候,就會取出來順序執行

    訂閱的邏輯大致在那呢,我們翻一翻我們第三篇分析:Soul 閘道器原始碼閱讀(三)請求處理概覽

    在類:HttpServerHandle ,找到很可疑的一段,猜測應該是這:

    public void onStateChange(Connection connection, State newState) {
        if (newState == HttpServerState.REQUEST_RECEIVED) {
            try {
                if (log.isDebugEnabled()) {
                    log.debug(ReactorNetty.format(connection.channel(), "Handler is being applied: {}"), new Object[]{this.handler});
                }

                HttpServerOperations ops = (HttpServerOperations)connection;
                // 在這進行了釋出和訂閱,而handler.apply(ops, ops)會不斷呼叫後面哪些plugin的邏輯
                Mono.fromDirect((Publisher)this.handler.apply(ops, ops)).subscribe(ops.disposeSubscriber());
            } catch (Throwable var4) {
                log.error(ReactorNetty.format(connection.channel(), ""), var4);
                connection.channel().close();
            }
        }

    }

    而限流的Mono是在divide之前,所以限流就先執行了,大致示意圖如下:

    大意是:fromDirect 函式觸發將 Plugin Mono 放到佇列中;subscribe函式,觸發執行,執行順序先進先出,則GlobalPlugin先進去的,則先開始執行(圖中Global先進的,把上方看做佇列底部,理解意思就行)。那順序就對應上了我們的除錯猜想

    還沒深入研究響應式程式設計,所以也有可能是錯的

疑問點

    在下面這段生成限流器的邏輯中,好像每次請求過來都是進行一個新的生成,有沒有可能進行復用,配置裡面加一個欄位,表示是否更新過,沒有更新,我們就複用我們之前的限流器;有更新我們就新生成一個

    當然上面優化,需要在具體瞭解動態配置更新後,再看看是否可行

    也有可能是不熟悉Resilience4J,可能下面的程式碼中Resilience4JRegistryFactory本身實現了快取複用

public class RateLimiterExecutor implements Executor {

    @Override
    public <T> Mono<T> run(final Mono<T> toRun, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf conf) {
        // 生成限流器
        RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(conf.getId(), conf.getRateLimiterConfig());
        // 應該是在這觸發的限流邏輯
        Mono<T> to = toRun.transformDeferred(RateLimiterOperator.of(rateLimiter));
        if (fallback != null) {
            return to.onErrorResume(fallback);
        }
        return to;
    }
}

總結

    本次文章大致探索了限流外掛:resilience4j的使用配置。除錯驗證它的限流邏輯執行在plugin鏈中執行順序,發現基本符合我們的猜想,限流邏輯的執行和plugin順序一致

    還初步討論提出了plugin鏈在Mono佇列中的執行猜想,後面研究響應式程式設計的時候驗證一下猜想是否正常

    最後提出了一些對限流器生成的一些優化疑問,看後面配置更新相關的分析的時候,是否能驗證自己的猜想

參考連結

Soul閘道器原始碼分析文章列表

Github

掘金