soul-websocket同步資料到閘道器原始碼分析
阿新 • • 發佈:2021-01-21
通過官網資料瞭解到,soul支援websocket,http,zookeeper等同步方式。
如果是通過websocket 同步策略,則將變更後的資料主動推送給 soul-web,並且在閘道器層,會有對應的WebsocketCacheHandler 處理器處理來處 admin 的資料推送。
這列應該是官網資料沒有及時更新
最新的類是WebsocketSyncDataService
{
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
//可以看到,將url拿到之後,以urls的數量建立了ScheduledThreadPoolExecutor執行緒池。
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
//遍歷urls,建立SoulWebsocketClient,並新增到clients集合中。
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(new URI(url), Objects. requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
for (WebSocketClient client : clients) {
//在最後遍歷clients集合,這裡使用了第三方的websocket包進行連線。
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
//注意這裡使用排程執行緒池進行斷線重連,30秒進行一次
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
//如果連線關閉,進行重連
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}