1. 程式人生 > 其它 >soul-websocket同步資料到閘道器原始碼分析

soul-websocket同步資料到閘道器原始碼分析

通過官網資料瞭解到,soul支援websocket,http,zookeeper等同步方式。
如果是通過websocket 同步策略,則將變更後的資料主動推送給 soul-web,並且在閘道器層,會有對應的WebsocketCacheHandler 處理器處理來處 admin 的資料推送。

這列應該是官網資料沒有及時更新
最新的類是WebsocketSyncDataService

{
        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");

//可以看到,將url拿到之後,以urls的數量建立了ScheduledThreadPoolExecutor執行緒池。
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true)); //遍歷urls,建立SoulWebsocketClient,並新增到clients集合中。 for (String url : urls) { try { clients.add(new SoulWebsocketClient(new URI(url), Objects.
requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers)); } catch (URISyntaxException e) { log.error("websocket url({}) is error", url, e); } } try { for (WebSocketClient client : clients) { //在最後遍歷clients集合,這裡使用了第三方的websocket包進行連線。
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS); if (success) { log.info("websocket connection is successful....."); } else { log.error("websocket connection is error....."); } //注意這裡使用排程執行緒池進行斷線重連,30秒進行一次 executor.scheduleAtFixedRate(() -> { try { if (client.isClosed()) { //如果連線關閉,進行重連 boolean reconnectSuccess = client.reconnectBlocking(); if (reconnectSuccess) { log.info("websocket reconnect is successful....."); } else { log.error("websocket reconnection is error....."); } } } catch (InterruptedException e) { log.error("websocket connect is error :{}", e.getMessage()); } }, 10, 30, TimeUnit.SECONDS); } /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/ } catch (InterruptedException e) { log.info("websocket connection...exception....", e); } }