Soul閘道器原始碼學習02
Soul閘道器原始碼學習02
文章目錄
資料同步
soul閘道器中路由規則、外掛管理、負載均衡策略、限流熔斷等配置,都是支援動態變更,資料的同步做到秒級同步。配置變更導致資料的同步支援 websocket、http、zookeeper、nacos 等多種方式,實現soul-admin和soul-web的資料同步。
websocket 同步
soul-web 啟動時與 soul-admin 建立 websocket 連線,之後soul-admin 會推送一次全量資料,後續如果配置資料發生變更,則將增量資料通過 websocket 主動推送給 soul-web。soul-admin基於 tomcat 的 websocket 實現服務端,soul-web 使用 java-websocket 第三方庫實現客戶端建立連線。
soul-web 客戶端
YML中配置使用 websocket 同步
soul :
sync:
websocket :
urls: ws://localhost:9095/websocket
WebsocketSyncDataConfiguration
@Bean
public SyncDataService websocketSyncDataService(@NotNull final ObjectProvider<WebsocketConfig> websocketConfig,
final ObjectProvider< PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers,
final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
//建立Websocket客戶端
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig: :new), pluginSubscriber.getIfAvailable(),metaSubscribers.getIfAvailable(Collections::emptyList),authSubscribers.getIfAvailable(Collections::emptyList));
}
@Bean
@ConfigurationProperties(prefix = "soul.sync.websocket")
//websocket配置
public WebsocketConfig websocketConfig() {
return new WebsocketConfig();
}
1. 使用spring.factories的方式載入,WebsocketSyncDataConfiguration。
2. @ConfigurationProperties 將 websocket 的配置資訊封裝成實體類。
3. 建立Websocket客戶端。
WebsocketSyncDataService
private List<WebSocketClient> clients = new ArrayList<>();
private final ScheduledThreadPoolExecutor executor;
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
//多個websocket地址
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
//建立定時任務的執行緒
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
//每個url建立Websocket客戶端
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(
new URI(url), Objects.requireNonNull(pluginDataSubscriber),
metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
for (WebSocketClient client : clients) {
//每個url建立Websocket客戶端
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
executor.scheduleAtFixedRate(() -> {
try {
//狀態是關閉或未建立連線
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}
//資源關閉
@Override
public void close() {
//關閉socket連線
for (WebSocketClient client : clients) {
if (!client.isClosed()) {
client.close();
}
}
if (Objects.nonNull(executor)) {
executor.shutdown();
}
}
1. 每個url建立websocket客戶端與服務端建立連線,因為是阻塞的所以設定超時時間,防止被卡住。
2. 狀態isClosed表示關閉或未建立連線,定時任務的執行緒進行重試,10秒後執行,間隔週期為30秒。
3. 實現AutoCloseable介面對資源的關閉,關閉socket連線。
SoulWebsocketClient
private volatile boolean alreadySync = Boolean.FALSE;
private final WebsocketDataHandler websocketDataHandler;
public SoulWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
super(serverUri);
this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber,
metaDataSubscribers, authDataSubscribers);
}
@Override
public void onOpen(final ServerHandshake serverHandshake) {
if (!alreadySync) {
//傳送MYSELF到服務端,獲取全量資料
send(DataEventTypeEnum.MYSELF.name());
alreadySync = true;
}
}
@Override
public void onMessage(final String result) {
//接受訊息
handleResult(result);
}
@Override
public void onClose(final int i, final String s, final boolean b) {
this.close();
}
@Override
public void onError(final Exception e) {
this.close();
}
@SuppressWarnings("ALL")
private void handleResult(final String result) {
//處理訊息
WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
String eventType = websocketData.getEventType();
String json = GsonUtils.getInstance().toJson(websocketData.getData());
websocketDataHandler.executor(groupEnum, json, eventType);
}
1. 建立連線後傳送訊息到服務端,服務端會發送全量的配置資訊給客戶端。
2. 接收服務端傳送的訊息並進行處理,json資料序列化成物件,從EnumMap中獲取處理的Handler。
3. 事件對應的Handler。
ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
AbstractDataHandler
protected abstract List<T> convert(String json);
protected abstract void doRefresh(List<T> dataList);
protected abstract void doUpdate(List<T> dataList);
protected abstract void doDelete(List<T> dataList);
@Override
public void handle(final String json, final String eventType) {
List<T> dataList = convert(json);
if (CollectionUtils.isNotEmpty(dataList)) {
DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
switch (eventTypeEnum) {
case REFRESH:
case MYSELF:
doRefresh(dataList);
break;
case UPDATE:
case CREATE:
doUpdate(dataList);
break;
case DELETE:
doDelete(dataList);
break;
default:
break;
}
}
}
1. 定義模板方法,不同事件型別呼叫各自實現類進行更新。