1. 程式人生 > 其它 >Soul API閘道器資料同步之WebSocket

Soul API閘道器資料同步之WebSocket

技術標籤:閘道器java

前言

前面的一系列文章,先說了幾個example的使用,然後說了SoulWebHandler、SoulPlugin、MatchStrage、MatchStrategy;前面幾個主要從使用的角度說,後面的基本源於單獨的類(介面)來進行原始碼解析的。

那麼從本篇文章開始將講一些系統性的東西,那就從資料同步來開始我們說資料同步的程式碼分析模組。

資料同步源頭

1、配置檔案

說到資料同步,我們要先說一下yml配置檔案,這裡涉及兩個地方,一個是soul-admin,另一個是soul-bootstrap,這兩個yml的檔案分別是:

soul-admin
soul:
  database:
dialect: mysql init_script: "META-INF/schema.sql" init_enable: true sync: websocket: enabled: true soul-bootstrap soul : file: enabled: true corss: enabled: true dubbo : parameter: multi sync: websocket : urls:
ws://localhost:9095/websocket

2、同步介面與實現

說完配置檔案,我們接著說一下SyncDataService這個介面,這個介面從命名上就可以直觀的看出是作為同步資料用的,程式碼如下:

/**
 * The interface Sync data service.
 */
public interface SyncDataService {
}

既然說到SyncDataService介面,那便自然有實現類來做實現,不過這個介面並沒有定義方法來做約束,意思就是說在各自的實現類中做各自的實現,這裡先看一下具體有哪些實現類,如圖:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-kdhO6Wu5-1611333797000)(https://uploader.shimo.im/f/Hp2ijLXTsN1RatcJ.png!thumbnail?fileGuid=XHDVWWJqVdQHY9xy)]

圖中有四個實現類,不過本篇文章只是以WebsocketSyncDataService角度來進行解析,那就直接看看這個類的程式碼實現:

@Slf4j
public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
    private final List<WebSocketClient> clients = new ArrayList<>();
    private final ScheduledThreadPoolExecutor executor;
    
    /**
     * Instantiates a new Websocket sync cache.
     *
     * @param websocketConfig      the websocket config
     * @param pluginDataSubscriber the plugin data subscriber
     * @param metaDataSubscribers  the meta data subscribers
     * @param authDataSubscribers  the auth data subscribers
     */
    public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
                                    final PluginDataSubscriber pluginDataSubscriber,
                                    final List<MetaDataSubscriber> metaDataSubscribers,
                                    final List<AuthDataSubscriber> authDataSubscribers) {
        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
        executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
        for (String url : urls) {
            try {
                clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
            } catch (URISyntaxException e) {
                log.error("websocket url({}) is error", url, e);
            }
        }
        try {
            for (WebSocketClient client : clients) {
                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
                if (success) {
                    log.info("websocket connection is successful.....");
                } else {
                    log.error("websocket connection is error.....");
                }
                executor.scheduleAtFixedRate(() -> {
                    try {
                        if (client.isClosed()) {
                            boolean reconnectSuccess = client.reconnectBlocking();
                            if (reconnectSuccess) {
                                log.info("websocket reconnect is successful.....");
                            } else {
                                log.error("websocket reconnection is error.....");
                            }
                        }
                    } catch (InterruptedException e) {
                        log.error("websocket connect is error :{}", e.getMessage());
                    }
                }, 10, 30, TimeUnit.SECONDS);
            }
            /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
        } catch (InterruptedException e) {
            log.info("websocket connection...exception....", e);
        }
    }
    
    @Override
    public void close() {
        for (WebSocketClient client : clients) {
            if (!client.isClosed()) {
                client.close();
            }
        }
        if (Objects.nonNull(executor)) {
            executor.shutdown();
        }
    }
}

WebsocketSyncDataService還實現了AutoCloseable介面,不過我們就看程式碼主題,在這個類中第一就是定義一個WebsocketSyncDataService的有參建構函式,然後就是實現了AutoCloseable類的close方法。
當然說到說到這裡就又要說WebsocketSyncDataService這個類是怎麼來的,那自然是基於Spring Boot自動裝配的原理了。這個裝配的位置是在WebsocketSyncDataConfiguration類中,程式碼如下:

@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {
    /**
     * Websocket sync data service.
     *
     * @param websocketConfig   the websocket config
     * @param pluginSubscriber the plugin subscriber
     * @param metaSubscribers   the meta subscribers
     * @param authSubscribers   the auth subscribers
     * @return the sync data service
     */
    @Bean
    public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
        log.info("you use websocket sync soul data.......");
        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
    }
    // 省略部分程式碼
}

在上面的這段程式碼中,還要關注下@ConditionalOnProperty這個註解,註解裡的引數(prefix = “soul.sync.websocket”)啥的大家是不是感覺很熟悉?這個“soul.sync.websocket”是不是在配置檔案中的。至於WebsocketSyncDataConfiguration的構建,就是前面說的Spring Boot自動構建的原理,卻在前面的文章中也有提及,那麼這裡就不再多提了。

啟動同步

1、On Message

說到啟動時同步,那就不得不提到WebsocketCollector這個類,在啟動soul-bootstrap時便會呼叫其中的WebsocketCollector.onMessage(…)方法,但是這個類的建立時機時在soul-admin專案啟動的時候進行構建的,這裡就不多說了。這裡要注意@ServerEndpoint("/websocket")、@OnMessage這兩個註解,關於這兩個註解乃是WebSocket的,這裡只是提一下,@Serverendpoint作為WebSocket伺服器的端點,而 @OnMessage註解的Java方法用於接收傳入的WebSocket資訊。這個資訊可以是文字,二進位制資訊。那意思就很明顯了,直接看onMessage方法的程式碼了,如下:

@OnMessage
public void onMessage(final String message, final Session session) {
    if (message.equals(DataEventTypeEnum.MYSELF.name())) {
        try {
            ThreadLocalUtil.put(SESSION_KEY, session);
            SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
        } finally {
            ThreadLocalUtil.clear();
        }
    }
}

這個方法的首先判斷DataEventType,若匹配便進入程式碼塊,程式碼塊中先把session存入快取中,然後就是根據SyncDataService.class來進行依賴查詢,來獲得SyncDataService類物件,緊接著便是呼叫syncAll方法。

2、syncAll方法

這個方法裡涉及到外掛、選擇器、規則這寫資料,並且設計到Spring的事件釋出,程式碼如下:

SyncDataServiceImpl.class

@Override
public boolean syncAll(final DataEventTypeEnum type) {
    appAuthService.syncData();
    List<PluginData> pluginDataList = pluginService.listAll();
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
    List<SelectorData> selectorDataList = selectorService.listAll();
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
    List<RuleData> ruleDataList = ruleService.listAll();
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
    metaDataService.syncData();
    return true;
}

上面的程式碼很直白,開始呼叫了appAuthService.syncData()方法,然後便是獲取pluginData,緊接著通過Spring的事件釋出器,來進行時間的釋出。後面依次是selector、rule。最後再次呼叫metaDataService.syncData()方法。
appAuthService.syncData()方法程式碼如下:、

AppAuthServiceImpl.class

@Override
public SoulAdminResult syncData() {
    List<AppAuthDO> appAuthDOList = appAuthMapper.selectAll();
    if (CollectionUtils.isNotEmpty(appAuthDOList)) {
        List<AppAuthData> dataList = appAuthDOList.stream().map(this::buildByEntity).collect(Collectors.toList());
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.APP_AUTH,
                DataEventTypeEnum.REFRESH,
                dataList));
    }
    return SoulAdminResult.success();
}

這段程式碼是同步許可權的資料,這個暫不多說,我們還是說說metaDataService.syncData()方法的程式碼:
MetaDataServiceImpl.class

@Override
public void syncData() {
    List<MetaDataDO> all = metaDataMapper.findAll();
    if (CollectionUtils.isNotEmpty(all)) {
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, DataEventTypeEnum.REFRESH, MetaDataTransfer.INSTANCE.mapToDataAll(all)));
    }
}

3、DataChangedEventDispatcher 接收事件

@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
    for (DataChangedListener listener : listeners) {
        switch (event.getGroupKey()) {
            case APP_AUTH:
                listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                break;
            case PLUGIN:
                listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                break;
            case RULE:
                listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                break;
            case SELECTOR:
                listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                break;
            case META_DATA:
                listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                break;
            default:
                throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
        }
    }
}

在前面Spring事件釋出器發不完事件後,這裡便開始接受事件了,就是說開始處理事件了。程式碼中是根據傳入的事件,然後event.getGroupKey(),然後匹配其事件的key。

@Override
public void onMetaDataChanged(final List<MetaData> metaDataList, final DataEventTypeEnum eventType) {
    WebsocketData<MetaData> configData =
            new WebsocketData<>(ConfigGroupEnum.META_DATA.name(), eventType.name(), metaDataList);
    WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
}

手動同步

手動同步就是呼叫metaDataService.syncData()方法,後面細節基本一致,那就不多說了。

總結

本篇文章從配置檔案、同步的介面、然後是WebSocket(涉及到註解)、然後就是同步的一個基本流程,這裡面還涉及到事件的釋出與事件的處理。