1. 程式人生 > 其它 >【Soul閘道器探祕】http資料同步-變更通知機制

【Soul閘道器探祕】http資料同步-變更通知機制

技術標籤:soulsoul閘道器java

引言

上一篇,梳理除了 soul-admin 在發出資料變更通知前的處理脈絡,本篇開始探究 http 同步策略的變更通知機制,

不同資料變更的通知機制應當是一致的,故本篇以 selector 配置變更通知為切入點進行深入。

配置操作入口

找到 ConfigController,這是配置操作的入口

image-20210130084220014

其持有一個 HttpLongPollingDataChangedListener 引用,通過 HttpLongPollingDataChangedListener 實現配置變更通知訂閱和配置獲取。

通知訂閱:

@PostMapping(value =
"/listener") public void listener(final HttpServletRequest request, final HttpServletResponse response) { longPollingListener.doLongPolling(request, response); }

配置獲取:

@GetMapping("/fetch")
public SoulAdminResult fetchConfigs(@NotNull final String[] groupKeys) {
    Map<String,
ConfigData<?>> result = Maps.newHashMap(); for (String groupKey : groupKeys) { ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey)); result.put(groupKey, data); } return SoulAdminResult.success(SoulResultMessage.SUCCESS, result)
; }

通知訂閱實現

使用 HttpLongPollingDataChangedListener#doLongPolling 實現通知訂閱

public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
    // 比較配置組md5
    List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
    String clientIp = getRemoteIp(request);
    // 發現配置組變化則立即響應
    if (CollectionUtils.isNotEmpty(changedGroup)) {
        this.generateResponse(response, changedGroup);
        log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
        return;
    }
    // 監聽配置變化
    final AsyncContext asyncContext = request.startAsync();
    asyncContext.setTimeout(0L);
    // 阻塞客戶端執行緒
    scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}

通過比較 MD5 檢查配置組是否發生變更,若配置組發生變更則立即響應,否則阻塞客戶端執行緒。

此處 compareChangedGroup 實現不做深究,繼續看LongPollingClient 具體處理:

@Override
public void run() {
    this.asyncTimeoutFuture = scheduler.schedule(() -> {
        clients.remove(LongPollingClient.this);
        List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
        sendResponse(changedGroups);
    }, timeoutTime, TimeUnit.MILLISECONDS);
    clients.add(this);
}

這裡將 client 加入 clients 的同時,開啟了一個定時任務,負責超時移除 client 並返回發生變化的配置組資訊。

超時時間為構造時傳入的 HttpConstants.SERVER_MAX_HOLD_TIMEOUT = 60s

配置獲取實現

使用 AbstractDataChangedListener#fetchConfig 實現配置獲取

public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {
    ConfigDataCache config = CACHE.get(groupKey.name());
    switch (groupKey) {
        case APP_AUTH:
            ...
        case PLUGIN:
            ...
        case RULE:
            ...
        case SELECTOR:
            List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() {
            }.getType());
            return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList);
        case META_DATA:
            ...
        default:
            throw new IllegalStateException("Unexpected groupKey: " + groupKey);
    }
}

這裡從 CACHE 快取獲取對應配置組資訊,包裝成 ConfigData 並返回。

建立訂閱關係

soul-web 端通過 HttpSyncDataConfiguration 初始化 HttpSyncDataService 並注入 spring容器。

HttpSyncDataService#start 方法在初始化時完成配置獲取和訂閱:

private void start() {
    // It could be initialized multiple times, so you need to control that.
    if (RUNNING.compareAndSet(false, true)) {
        // fetch all group configs.
        this.fetchGroupConfig(ConfigGroupEnum.values());
        int threadSize = serverList.size();
        this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                SoulThreadFactory.create("http-long-polling", true));
        // start long polling, each server creates a thread to listen for changes.
        this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
    } else {
        log.info("soul http long polling was started, executor=[{}]", executor);
    }
}

1)配置獲取

private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {
    for (int index = 0; index < this.serverList.size(); index++) {
        String server = serverList.get(index);
        try {
            this.doFetchGroupConfig(server, groups);
            break;
        } catch (SoulException e) {
            // no available server, throw exception.
            if (index >= serverList.size() - 1) {
                throw e;
            }
            log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
        }
    }
}

doFetchGroupConfig 內部發起配置獲取請求並更新本地快取

private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
    ...
    String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
    ...
    try {
        json = this.httpClient.getForObject(url, String.class);
    } catch (RestClientException e) {
        ...
    }
    // update local cache
    boolean updated = this.updateCacheWithJson(json);
    ...
}

2)配置訂閱

藉助 HttpLongPollingTask 完成

@Override
public void run() {
    while (RUNNING.get()) {
        for (int time = 1; time <= retryTimes; time++) {
            try {
                doLongPolling(server);
            } catch (Exception e) {
                ...
            }
        }
    }
    log.warn("Stop http long polling.");
}

HttpLongPollingTask 不斷迴圈 doLongPolling,此處有 retry 操作

private void doLongPolling(final String server) {
    ...
    String listenerUrl = server + "/configs/listener";
    ...
    try {
        String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
        log.debug("listener result: [{}]", json);
        groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
    } catch (RestClientException e) {
        ...
    }
    if (groupJson != null) {
        // fetch group configuration async.
        ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
        if (ArrayUtils.isNotEmpty(changedGroups)) {
            log.info("Group config changed: {}", Arrays.toString(changedGroups));
            this.doFetchGroupConfig(server, changedGroups);
        }
    }
}

doLongPolling 內部發起 post 請求訂閱配置變更,若發生變更則重新獲取配置。

至此,通知訂閱處理脈絡已清晰:

  1. soul-web 端通過 http 發起訂閱請求

  2. soul-admin 端收到請求,通過比較 MD5 檢查配置組是否存在變更

    • 若存在變更,則立即響應變更組資訊
    • 若無變更,則阻塞客戶端執行緒,並開啟定時任務 60s 後重新比較配置組變更並返回響應
  3. soul-web 端收到響應,判斷配置組是否存在變更

    • 若存在變更,則發起獲取配置請求獲取最新配置資訊
  4. soul-web 重新發起訂閱請求

配置變更

上回我們說到AbstractDataChangedListener 的 onSelectorChanged 實現:

public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
    if (CollectionUtils.isEmpty(changed)) {
        return;
    }
    // 更新 selector 快取
    this.updateSelectorCache();
    // selector 變更後處理,實現具體的變更通知
    this.afterSelectorChanged(changed, eventType);
}

這裡 selector 變更處理是先更快取後發通知,繼續看 afterSelectorChanged 實現。

HttpLongPollingDataChangedListener 真正實現了 AbstractDataChangedListener 的 afterSelectorChanged:

@Override
protected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
    scheduler.execute(new DataChangeTask(ConfigGroupEnum.SELECTOR));
}

由定時任務重複執行 DataChangeTask,DataChangeTask 具體處理如下:

@Override
public void run() {
    for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
        LongPollingClient client = iter.next();
        iter.remove();
        client.sendResponse(Collections.singletonList(groupKey));
        log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
    }
}

DataChangeTask 負責從 clients 依次移除 LongPollingClient 並將 groupKey 作為響應返回,sendResponse 內部處理如下:

void sendResponse(final List<ConfigGroupEnum> changedGroups) {
    // cancel scheduler
    if (null != asyncTimeoutFuture) {
        asyncTimeoutFuture.cancel(false);
    }
    generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
    asyncContext.complete();
}

負責生成響應報文並非同步響應客戶端,注意有個 asyncTimeoutFuture.cancel 操作,取消之前的 60s 超時響應。

總結

本篇梳理和分析了 soul-web 端到 soul-admin 端的配置變更通知訂閱關係建立過程,配合上配置獲取介面,完成了整個 http 資料同步策略的變更通知機制。

下篇,將探究 http 同步策略的web端處理變更通知,期待驚喜。

個人知識庫

高效能微服務API閘道器-Soul