Soul API閘道器資料同步之WebSocket
前言
前面的一系列文章,先說了幾個example的使用,然後說了SoulWebHandler、SoulPlugin、MatchStrage、MatchStrategy;前面幾個主要從使用的角度說,後面的基本源於單獨的類(介面)來進行原始碼解析的。
那麼從本篇文章開始將講一些系統性的東西,那就從資料同步來開始我們說資料同步的程式碼分析模組。
資料同步源頭
1、配置檔案
說到資料同步,我們要先說一下yml配置檔案,這裡涉及兩個地方,一個是soul-admin,另一個是soul-bootstrap,這兩個yml的檔案分別是:
soul-admin
soul:
database:
dialect: mysql
init_script: "META-INF/schema.sql"
init_enable: true
sync:
websocket:
enabled: true
soul-bootstrap
soul :
file:
enabled: true
corss:
enabled: true
dubbo :
parameter: multi
sync:
websocket :
urls: ws://localhost:9095/websocket
2、同步介面與實現
說完配置檔案,我們接著說一下SyncDataService這個介面,這個介面從命名上就可以直觀的看出是作為同步資料用的,程式碼如下:
/**
* The interface Sync data service.
*/
public interface SyncDataService {
}
既然說到SyncDataService介面,那便自然有實現類來做實現,不過這個介面並沒有定義方法來做約束,意思就是說在各自的實現類中做各自的實現,這裡先看一下具體有哪些實現類,如圖:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-kdhO6Wu5-1611333797000)(https://uploader.shimo.im/f/Hp2ijLXTsN1RatcJ.png!thumbnail?fileGuid=XHDVWWJqVdQHY9xy)]
圖中有四個實現類,不過本篇文章只是以WebsocketSyncDataService角度來進行解析,那就直接看看這個類的程式碼實現:
@Slf4j
public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
private final List<WebSocketClient> clients = new ArrayList<>();
private final ScheduledThreadPoolExecutor executor;
/**
* Instantiates a new Websocket sync cache.
*
* @param websocketConfig the websocket config
* @param pluginDataSubscriber the plugin data subscriber
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}
@Override
public void close() {
for (WebSocketClient client : clients) {
if (!client.isClosed()) {
client.close();
}
}
if (Objects.nonNull(executor)) {
executor.shutdown();
}
}
}
WebsocketSyncDataService還實現了AutoCloseable介面,不過我們就看程式碼主題,在這個類中第一就是定義一個WebsocketSyncDataService的有參建構函式,然後就是實現了AutoCloseable類的close方法。
當然說到說到這裡就又要說WebsocketSyncDataService這個類是怎麼來的,那自然是基於Spring Boot自動裝配的原理了。這個裝配的位置是在WebsocketSyncDataConfiguration類中,程式碼如下:
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {
/**
* Websocket sync data service.
*
* @param websocketConfig the websocket config
* @param pluginSubscriber the plugin subscriber
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @return the sync data service
*/
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
// 省略部分程式碼
}
在上面的這段程式碼中,還要關注下@ConditionalOnProperty這個註解,註解裡的引數(prefix = “soul.sync.websocket”)啥的大家是不是感覺很熟悉?這個“soul.sync.websocket”是不是在配置檔案中的。至於WebsocketSyncDataConfiguration的構建,就是前面說的Spring Boot自動構建的原理,卻在前面的文章中也有提及,那麼這裡就不再多提了。
啟動同步
1、On Message
說到啟動時同步,那就不得不提到WebsocketCollector這個類,在啟動soul-bootstrap時便會呼叫其中的WebsocketCollector.onMessage(…)方法,但是這個類的建立時機時在soul-admin專案啟動的時候進行構建的,這裡就不多說了。這裡要注意@ServerEndpoint("/websocket")、@OnMessage這兩個註解,關於這兩個註解乃是WebSocket的,這裡只是提一下,@Serverendpoint作為WebSocket伺服器的端點,而 @OnMessage註解的Java方法用於接收傳入的WebSocket資訊。這個資訊可以是文字,二進位制資訊。那意思就很明顯了,直接看onMessage方法的程式碼了,如下:
@OnMessage
public void onMessage(final String message, final Session session) {
if (message.equals(DataEventTypeEnum.MYSELF.name())) {
try {
ThreadLocalUtil.put(SESSION_KEY, session);
SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
} finally {
ThreadLocalUtil.clear();
}
}
}
這個方法的首先判斷DataEventType,若匹配便進入程式碼塊,程式碼塊中先把session存入快取中,然後就是根據SyncDataService.class來進行依賴查詢,來獲得SyncDataService類物件,緊接著便是呼叫syncAll方法。
2、syncAll方法
這個方法裡涉及到外掛、選擇器、規則這寫資料,並且設計到Spring的事件釋出,程式碼如下:
SyncDataServiceImpl.class
@Override
public boolean syncAll(final DataEventTypeEnum type) {
appAuthService.syncData();
List<PluginData> pluginDataList = pluginService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
List<SelectorData> selectorDataList = selectorService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
List<RuleData> ruleDataList = ruleService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
metaDataService.syncData();
return true;
}
上面的程式碼很直白,開始呼叫了appAuthService.syncData()方法,然後便是獲取pluginData,緊接著通過Spring的事件釋出器,來進行時間的釋出。後面依次是selector、rule。最後再次呼叫metaDataService.syncData()方法。
appAuthService.syncData()方法程式碼如下:、
AppAuthServiceImpl.class
@Override
public SoulAdminResult syncData() {
List<AppAuthDO> appAuthDOList = appAuthMapper.selectAll();
if (CollectionUtils.isNotEmpty(appAuthDOList)) {
List<AppAuthData> dataList = appAuthDOList.stream().map(this::buildByEntity).collect(Collectors.toList());
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.APP_AUTH,
DataEventTypeEnum.REFRESH,
dataList));
}
return SoulAdminResult.success();
}
這段程式碼是同步許可權的資料,這個暫不多說,我們還是說說metaDataService.syncData()方法的程式碼:
MetaDataServiceImpl.class
@Override
public void syncData() {
List<MetaDataDO> all = metaDataMapper.findAll();
if (CollectionUtils.isNotEmpty(all)) {
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, DataEventTypeEnum.REFRESH, MetaDataTransfer.INSTANCE.mapToDataAll(all)));
}
}
3、DataChangedEventDispatcher 接收事件
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
在前面Spring事件釋出器發不完事件後,這裡便開始接受事件了,就是說開始處理事件了。程式碼中是根據傳入的事件,然後event.getGroupKey(),然後匹配其事件的key。
@Override
public void onMetaDataChanged(final List<MetaData> metaDataList, final DataEventTypeEnum eventType) {
WebsocketData<MetaData> configData =
new WebsocketData<>(ConfigGroupEnum.META_DATA.name(), eventType.name(), metaDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
}
手動同步
手動同步就是呼叫metaDataService.syncData()方法,後面細節基本一致,那就不多說了。
總結
本篇文章從配置檔案、同步的介面、然後是WebSocket(涉及到註解)、然後就是同步的一個基本流程,這裡面還涉及到事件的釋出與事件的處理。