Soul閘道器原始碼分析-10期(周總結)
文章目錄
後臺與閘道器資料同步 (Websocket篇)
後臺如何建立Websocket?
DataSyncConfiguration: 作為 Spring Bean 的配置工廠, 可以根據配置資訊, 構建各類監聽器, 包括 HTTP 長輪詢方式、Zookeeper 方式、Nacos 方式、Websocket 方法.
@Configuration
public class DataSyncConfiguration {
// soul-admin 專案的配置資訊中, 使用 soul.sync.websocket.enabled 開啟或關閉 websocket
@Configuration
@ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties (WebsocketSyncProperties.class)
static class WebsocketListener {
@Bean
@ConditionalOnMissingBean(WebsocketCollector.class)
public WebsocketCollector websocketCollector() {
return new WebsocketCollector();
}
}
}
WebsocketListener: 作為 DataSyncConfiguration
的內部類, 負責 websocket 監聽器初始化.
WebsocketCollector: 監聽 websocket 連線及接收資訊, 維護所有連線後臺的 session 會話, 提供 send()
方法通知 session 資訊.
閘道器如何建立Websocket?
WebsocketSyncDataConfiguration: 作為 Spring Bean 的配置工廠, 是閘道器端構建 Websocket 通訊的入口. (獨立出一個啟動專案 soul-spring-boot-starter-sync-data-websocket
, 供閘道器自由選用)
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {
// 收集所有註冊為 Bean 的訂閱器, 如 PluginDataSubscriber、MetaDataSubscriber、AuthDataSubscriber
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
// soul-bootstrap 專案的配置資訊中, 使用 soul.sync.websocket 配置要建立連線的後臺路徑
@Bean
@ConfigurationProperties(prefix = "soul.sync.websocket")
public WebsocketConfig websocketConfig() {
return new WebsocketConfig();
}
}
WebsocketSyncDataService: 獲取所有註冊為 Bean 的 WebsocketConfig
以及各種 DataSubscriber
訂閱器, 構建實現了 WebsocketClient
的 SoulWebsocketClient
列表
SoulWebsocketClient: Websocket
通訊類, 監聽 websocket 連線及接收資訊, 在接收到後臺傳來的資訊後會通知各個訂閱器.
public final class SoulWebsocketClient extends WebSocketClient {
private final WebsocketDataHandler websocketDataHandler;
private void handleResult(final String result) {
WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
// 根據傳入資訊得到資料變更的事件型別, 如 refresh、update、delete 等
String eventType = websocketData.getEventType();
String json = GsonUtils.getInstance().toJson(websocketData.getData());
websocketDataHandler.executor(groupEnum, json, eventType);
}
}
WebsocketDataHandler: 初始化時構建各類實現 AbstractDataHandler
的資料處理類並快取.
public class WebsocketDataHandler {
// 快取所有 DataHandler 資料變動處理類
private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);
public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
}
public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
// 根據資料變動事件型別, 呼叫相應的 DataHandler 資料處理類
ENUM_MAP.get(type).handle(json, eventType);
}
}
閘道器資料變動呼叫鏈
實現 Websocket 通訊的入口類 SoulWebsocketClient
在接到後臺通訊後, 呼叫 WebsocketDataHandler
的 executor()
方法匹配資訊型別, 並呼叫對應的 DataHandler
的 handler()
去處理資訊.
AbstractDataHandler: 實現 handler()
方法, 根據事件的型別 (如重新整理、更新、建立、刪除等), 呼叫對應事件抽象方法.
public abstract class AbstractDataHandler<T> implements DataHandler {
// 根據資料的事件型別 (eventType) 分發到各自方法, 這些被呼叫的方法由子類實現, 因為不同型別的元資料處理類的處理方式不同
@Override
public void handle(final String json, final String eventType) {
List<T> dataList = convert(json);
if (CollectionUtils.isNotEmpty(dataList)) {
DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
switch (eventTypeEnum) {
case REFRESH:
case MYSELF:
doRefresh(dataList);
break;
case UPDATE:
case CREATE:
doUpdate(dataList);
break;
case DELETE:
doDelete(dataList);
break;
default:
break;
}
}
}
}
XXXDataHandler: 這裡指的是 AbstractDataHandler
的各個實現類 (如 PluginDataHandler
等), 主要作用是呼叫其訂閱器.
不同的 DataHandler
呼叫的訂閱方法不同:
PluginDataHandler
會呼叫onSubscribe()
通知外掛元資料變更SelectorDataHandler
會呼叫onSelectorSubscribe()
通知選擇器元資料變更RuleDataHandler
會呼叫onRuleSubscribe()
通知規則元資料變更
@RequiredArgsConstructor
public class PluginDataHandler extends AbstractDataHandler<PluginData> {
private final PluginDataSubscriber pluginDataSubscriber;
@Override
protected void doUpdate(final List<PluginData> dataList) {
// 呼叫訂閱器的 onSubscribe(), 傳送資料物件 PluginData
dataList.forEach(pluginDataSubscriber::onSubscribe);
}
// ...
}
CommonPluginDataSubscriber: 訂閱器的 onSubscribe()
方法會通知到所有注入為 Bean 的 PluginDataHandler
類 (不要和前面的同名類混淆, 它是 soul-plugin-base
下的介面, 它的實現類在各個可插拔外掛包)
public class CommonPluginDataSubscriber implements PluginDataSubscriber {
// 收集所有註冊為 Bean 的資料處理器並快取, 比如 HTTP 外掛 divide 下的 DividePluginDataHandler
private final Map<String, PluginDataHandler> handlerMap;
// 外掛元資料變動呼叫
@Override
public void onSubscribe(final PluginData pluginData) {
BaseDataCache.getInstance().cachePluginData(pluginData);
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
}
// 選擇器元資料變動呼叫
@Override
public void onSelectorSubscribe(final SelectorData selectorData) {
BaseDataCache.getInstance().cacheSelectData(selectorData);
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
}
// 規則元資料變動呼叫
@Override
public void onRuleSubscribe(final RuleData ruleData) {
BaseDataCache.getInstance().cacheRuleData(ruleData);
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
}
}
TIPS
整個大專案下存在兩個同名的類 PluginDataHandler, 其中一個在專案 soul-sync-data-websocket
下, 用於通知外掛元資料變更, 另一個在 soul-plugin-base
下, 用於定義各個外掛的各個型別元資料更新.
總結下這兩個類命名的意義, soul-sync-data-websocket
下類名的 “plugin” 指元資料的型別為外掛類, soul-plugin-base
下類名的 “plugin” 指繼承它的子類來自與各個可插播外掛, 比如divide、dubbo外掛等