【Soul閘道器探祕】http資料同步-Web端處理變更通知
引言
上一篇,梳理http 資料同步策略的變更通知機制,本篇開始探究配置變更通知到達後, soul-web
端的處理響應。
不同資料變更的通知機制應當是一致的,故本篇以 selector 配置變更通知為切入點進行深入。
通知處理入口
上回我們說到 HttpSyncDataService 的 doLongPolling,在其內部發起通知訂閱並接收響應通知:
private void doLongPolling(final String server) {
...
String listenerUrl = server + "/configs/listener";
...
try {
// 發起監聽請求
String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
log.debug("listener result: [{}]", json);
groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data" );
} catch (RestClientException e) {
...
}
// 處理變更通知
if (groupJson != null) {
// fetch group configuration async.
ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
if (ArrayUtils.isNotEmpty(changedGroups)) {
log. info("Group config changed: {}", Arrays.toString(changedGroups));
// 獲取組配置
this.doFetchGroupConfig(server, changedGroups);
}
}
}
在收到變更通知時,若存在配置組變更,則按變更組獲取相應配置。
獲取配置
獲取組配置處理如下:
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
...
String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
...
try {
json = this.httpClient.getForObject(url, String.class);
} catch (RestClientException e) {
...
}
// update local cache
boolean updated = this.updateCacheWithJson(json);
...
}
內部發起配置獲取請求並更新本地快取。
更新配置組快取
由 HttpSyncDataService 實現本地快取更新:
private boolean updateCacheWithJson(final String json) {
JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
JsonObject data = jsonObject.getAsJsonObject("data");
// if the config cache will be updated?
return factory.executor(data);
}
轉成 Json 物件後交由 DataRefreshFactory 進行處理。
DataRefreshFactory 處理如下:
public boolean executor(final JsonObject data) {
final boolean[] success = {false};
ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
return success[0];
}
呼叫相應資料重新整理類重新整理資料。
統一由 AbstractDataRefresh 的 refresh 進行處理:
public Boolean refresh(final JsonObject data) {
boolean updated = false;
JsonObject jsonObject = convert(data);
if (null != jsonObject) {
ConfigData<T> result = fromJson(jsonObject);
if (this.updateCacheIfNeed(result)) {
updated = true;
refresh(result.getData());
}
}
return updated;
}
先更新本地快取,再呼叫子類實現的 refresh。
此處的更新本地快取處理,由子類 SelectorDataRefresh 的 updateCacheIfNeed 實現:
protected boolean updateCacheIfNeed(final ConfigData<SelectorData> result) {
return updateCacheIfNeed(result, ConfigGroupEnum.SELECTOR);
}
向父類 AbstractDataRefresh 的 updateCacheIfNeed 指定更新 selector 配置組。
父類 AbstractDataRefresh 的 updateCacheIfNeed 處理:
protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {
// 首次初始化快取
if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {
return true;
}
ResultHolder holder = new ResultHolder(false);
GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {
// 必須比較最後更新時間
if (!StringUtils.equals(oldVal.getMd5(), newVal.getMd5()) && oldVal.getLastModifyTime() < newVal.getLastModifyTime()) {
...
holder.result = true;
return newVal;
}
...
return oldVal;
});
return holder.result;
}
通過比較新老快取的 MD5 值來判定是否發生變更,存在變更則更新本地快取(注意還有最後更新時間判定)。
處理重新整理事件
SelectorDataRefresh 的 refresh 實現:
protected void refresh(final List<SelectorData> data) {
if (CollectionUtils.isEmpty(data)) {
log.info("clear all selector cache, old cache");
data.forEach(pluginDataSubscriber::unSelectorSubscribe);
pluginDataSubscriber.refreshSelectorDataAll();
} else {
// update cache for UpstreamCacheManager
pluginDataSubscriber.refreshSelectorDataAll();
data.forEach(pluginDataSubscriber::onSelectorSubscribe);
}
}
- 若最新資料為空,則迴圈取消訂閱並重新整理所有選擇器資料,實際是清空選擇器快取。
- 若最新資料不為空,則重新整理所有選擇器資料並迴圈響應選擇器訂閱事件處理,實際是更新上游服務快取。
取消訂閱
CommonPluginDataSubscriber 實現訂閱取消:
public void unSelectorSubscribe(final SelectorData selectorData) {
subscribeDataHandler(selectorData, DataEventTypeEnum.DELETE);
}
subscribeDataHandler 對 selectorData 的 delete 處理:
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
Optional.ofNullable(classData).ifPresent(data -> {
if (data instanceof PluginData) {
...
} else if (data instanceof SelectorData) {
SelectorData selectorData = (SelectorData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
...
} else if (dataType == DataEventTypeEnum.DELETE) {
BaseDataCache.getInstance().removeSelectData(selectorData);
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
}
} else if (data instanceof RuleData) {
...
}
});
}
從 BaseDataCache 刪除目標選擇器資料,並移除選擇器。
此處由 DividePluginDataHandler 提供 removeSelector 實現:
public void removeSelector(final SelectorData selectorData) {
UpstreamCacheManager.getInstance().removeByKey(selectorData.getId());
}
根據 selector id 移除快取的上游服務,注意只是從 UPSTREAM_MAP_TEMP 移除
public void removeByKey(final String key) {
UPSTREAM_MAP_TEMP.remove(key);
}
重新整理資料
CommonPluginDataSubscriber 實現資料重新整理:
public void refreshSelectorDataAll() {
BaseDataCache.getInstance().cleanSelectorData();
}
注意這裡的 refresh all 實際是做的 clean 操作。
BaseDataCache 的 cleanSelectorData 處理:
public void cleanSelectorData() {
SELECTOR_MAP.clear();
}
直接清除 SELECTOR_MAP 所有資料。
響應訂閱
CommonPluginDataSubscriber 實現訂閱響應:
public void onSelectorSubscribe(final SelectorData selectorData) {
subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);
}
subscribeDataHandler 對 selectorData 的 update 處理:
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
Optional.ofNullable(classData).ifPresent(data -> {
if (data instanceof PluginData) {
...
} else if (data instanceof SelectorData) {
SelectorData selectorData = (SelectorData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
BaseDataCache.getInstance().cacheSelectData(selectorData);
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
} else if (dataType == DataEventTypeEnum.DELETE) {
...
}
} else if (data instanceof RuleData) {
...
}
});
}
快取選擇器資料到 BaseDataCache,並處理選擇器。
此處由 DividePluginDataHandler 提供 handlerSelector 實現:
public void handlerSelector(final SelectorData selectorData) {
UpstreamCacheManager.getInstance().submit(selectorData);
}
提交選擇器資料到 UpstreamCacheManager。
UpstreamCacheManager 的 submit 處理:
public void submit(final SelectorData selectorData) {
final List<DivideUpstream> upstreamList = GsonUtils.getInstance().fromList(selectorData.getHandle(), DivideUpstream.class);
if (null != upstreamList && upstreamList.size() > 0) {
UPSTREAM_MAP.put(selectorData.getId(), upstreamList);
UPSTREAM_MAP_TEMP.put(selectorData.getId(), upstreamList);
} else {
UPSTREAM_MAP.remove(selectorData.getId());
UPSTREAM_MAP_TEMP.remove(selectorData.getId());
}
}
根據 selector id 更新 UPSTREAM_MAP 和 UPSTREAM_MAP_TEMP。
總結
本篇梳理和分析了配置變更通知到達後 soul-web
端的處理流程,最終處理主要是更新本地配置快取以及維護上游服務散列表。
soul-web
收到變更通知後處理流程如下:
soul-web 端收到響應
- 若配置組資料存在變更,則發起獲取配置請求獲取最新配置資訊
- 更新配置組快取
- 迴圈處理配置資料重新整理事件
- 若最新配置資料為空,則刪除本地配置資料並移除上游服務
- 若最新配置資料不為空,則快取配置組資料並更新上游服務
- 若配置組資料無變更,不作處理