1. 程式人生 > 其它 >Soul閘道器原始碼分析-10期(周總結)

Soul閘道器原始碼分析-10期(周總結)

技術標籤:閘道器java

文章目錄


後臺與閘道器資料同步 (Websocket篇)



後臺如何建立Websocket?


DataSyncConfiguration WebsocketListener WebsocketCollector

DataSyncConfiguration: 作為 Spring Bean 的配置工廠, 可以根據配置資訊, 構建各類監聽器, 包括 HTTP 長輪詢方式、Zookeeper 方式、Nacos 方式、Websocket 方法.

@Configuration
public class DataSyncConfiguration {
  
  // soul-admin 專案的配置資訊中, 使用 soul.sync.websocket.enabled 開啟或關閉 websocket
  @Configuration
  @ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
  @EnableConfigurationProperties
(WebsocketSyncProperties.class) static class WebsocketListener { @Bean @ConditionalOnMissingBean(WebsocketCollector.class) public WebsocketCollector websocketCollector() { return new WebsocketCollector(); } } }

WebsocketListener: 作為 DataSyncConfiguration 的內部類, 負責 websocket 監聽器初始化.


WebsocketCollector: 監聽 websocket 連線及接收資訊, 維護所有連線後臺的 session 會話, 提供 send() 方法通知 session 資訊.




閘道器如何建立Websocket?


WebsocketSyncDataConfiguration WebsocketSyncDataService SoulWebsocketClient WebsocketDataHandler AbstractDataHandler

WebsocketSyncDataConfiguration: 作為 Spring Bean 的配置工廠, 是閘道器端構建 Websocket 通訊的入口. (獨立出一個啟動專案 soul-spring-boot-starter-sync-data-websocket , 供閘道器自由選用)

@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {
  
  // 收集所有註冊為 Bean 的訂閱器,  如 PluginDataSubscriber、MetaDataSubscriber、AuthDataSubscriber
  @Bean
  public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
    log.info("you use websocket sync soul data.......");
    return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
  }
  
  // soul-bootstrap 專案的配置資訊中, 使用 soul.sync.websocket 配置要建立連線的後臺路徑
  @Bean
  @ConfigurationProperties(prefix = "soul.sync.websocket")
  public WebsocketConfig websocketConfig() {
    return new WebsocketConfig();
  }
}

WebsocketSyncDataService: 獲取所有註冊為 Bean 的 WebsocketConfig 以及各種 DataSubscriber 訂閱器, 構建實現了 WebsocketClientSoulWebsocketClient 列表


SoulWebsocketClient: Websocket 通訊類, 監聽 websocket 連線及接收資訊, 在接收到後臺傳來的資訊後會通知各個訂閱器.

public final class SoulWebsocketClient extends WebSocketClient {
  
  private final WebsocketDataHandler websocketDataHandler;
  
	private void handleResult(final String result) {
    WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
    ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
    // 根據傳入資訊得到資料變更的事件型別, 如 refresh、update、delete 等
    String eventType = websocketData.getEventType();
    String json = GsonUtils.getInstance().toJson(websocketData.getData());
    websocketDataHandler.executor(groupEnum, json, eventType);
  }
}

WebsocketDataHandler: 初始化時構建各類實現 AbstractDataHandler 的資料處理類並快取.

public class WebsocketDataHandler {
  
  // 快取所有 DataHandler 資料變動處理類
  private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);

  public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
                              final List<MetaDataSubscriber> metaDataSubscribers,
                              final List<AuthDataSubscriber> authDataSubscribers) {
    ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
    ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
    ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
    ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
    ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
  }

  public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
    // 根據資料變動事件型別, 呼叫相應的 DataHandler 資料處理類
    ENUM_MAP.get(type).handle(json, eventType);
  }
}




閘道器資料變動呼叫鏈


實現 Websocket 通訊的入口類 SoulWebsocketClient 在接到後臺通訊後, 呼叫 WebsocketDataHandlerexecutor() 方法匹配資訊型別, 並呼叫對應的 DataHandlerhandler() 去處理資訊.

SoulWebsocketClient WebsocketDataHandler AbstractDataHandler PluginDataHandler SelectorDataHandler RuleDataHandler AuthDataHandler MetaDataHandler PluginDataSubscriber CommonPluginDataSubscriber AuthDataSubscriber SignAuthDataSubscriber MetaDataSubscriber org.dromara.soul.plugin.base.handler.PluginDataHandler

AbstractDataHandler: 實現 handler() 方法, 根據事件的型別 (如重新整理、更新、建立、刪除等), 呼叫對應事件抽象方法.

public abstract class AbstractDataHandler<T> implements DataHandler {

  // 根據資料的事件型別 (eventType) 分發到各自方法, 這些被呼叫的方法由子類實現, 因為不同型別的元資料處理類的處理方式不同
  @Override
  public void handle(final String json, final String eventType) {
    List<T> dataList = convert(json);
    if (CollectionUtils.isNotEmpty(dataList)) {
      DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
      switch (eventTypeEnum) {
        case REFRESH:
        case MYSELF:
          doRefresh(dataList);
          break;
        case UPDATE:
        case CREATE:
          doUpdate(dataList);
          break;
        case DELETE:
          doDelete(dataList);
          break;
        default:
          break;
      }
    }
  }
}

XXXDataHandler: 這裡指的是 AbstractDataHandler 的各個實現類 (如 PluginDataHandler 等), 主要作用是呼叫其訂閱器.

不同的 DataHandler 呼叫的訂閱方法不同:

  • PluginDataHandler 會呼叫 onSubscribe() 通知外掛元資料變更
  • SelectorDataHandler 會呼叫 onSelectorSubscribe() 通知選擇器元資料變更
  • RuleDataHandler 會呼叫 onRuleSubscribe() 通知規則元資料變更
@RequiredArgsConstructor
public class PluginDataHandler extends AbstractDataHandler<PluginData> {
  
  private final PluginDataSubscriber pluginDataSubscriber;
  
  @Override
  protected void doUpdate(final List<PluginData> dataList) {
    // 呼叫訂閱器的 onSubscribe(), 傳送資料物件 PluginData
    dataList.forEach(pluginDataSubscriber::onSubscribe);
  }
  
  // ...
}

CommonPluginDataSubscriber: 訂閱器的 onSubscribe() 方法會通知到所有注入為 Bean 的 PluginDataHandler 類 (不要和前面的同名類混淆, 它是 soul-plugin-base 下的介面, 它的實現類在各個可插拔外掛包)
在這裡插入圖片描述

public class CommonPluginDataSubscriber implements PluginDataSubscriber {
  
  // 收集所有註冊為 Bean 的資料處理器並快取, 比如 HTTP 外掛 divide 下的 DividePluginDataHandler
  private final Map<String, PluginDataHandler> handlerMap;
  
  // 外掛元資料變動呼叫
  @Override
  public void onSubscribe(final PluginData pluginData) {
    BaseDataCache.getInstance().cachePluginData(pluginData);
    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
  }
  
  // 選擇器元資料變動呼叫
  @Override
  public void onSelectorSubscribe(final SelectorData selectorData) {
    BaseDataCache.getInstance().cacheSelectData(selectorData);
    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
  }
  
  // 規則元資料變動呼叫
  @Override
  public void onRuleSubscribe(final RuleData ruleData) {
    BaseDataCache.getInstance().cacheRuleData(ruleData);
    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
  }
}




TIPS


整個大專案下存在兩個同名的類 PluginDataHandler, 其中一個在專案 soul-sync-data-websocket 下, 用於通知外掛元資料變更, 另一個在 soul-plugin-base 下, 用於定義各個外掛的各個型別元資料更新.


總結下這兩個類命名的意義, soul-sync-data-websocket 下類名的 “plugin” 指元資料的型別為外掛類, soul-plugin-base 下類名的 “plugin” 指繼承它的子類來自與各個可插播外掛, 比如divide、dubbo外掛等