1. 程式人生 > 其它 >Soul閘道器原始碼學習02

Soul閘道器原始碼學習02

技術標籤:souljava閘道器

Soul閘道器原始碼學習02

文章目錄

資料同步

soul閘道器中路由規則、外掛管理、負載均衡策略、限流熔斷等配置,都是支援動態變更,資料的同步做到秒級同步。配置變更導致資料的同步支援 websocket、http、zookeeper、nacos 等多種方式,實現soul-admin和soul-web的資料同步。

websocket 同步

soul-web 啟動時與 soul-admin 建立 websocket 連線,之後soul-admin 會推送一次全量資料,後續如果配置資料發生變更,則將增量資料通過 websocket 主動推送給 soul-web。soul-admin基於 tomcat 的 websocket 實現服務端,soul-web 使用 java-websocket 第三方庫實現客戶端建立連線。

soul-web 客戶端

YML中配置使用 websocket 同步

soul :
    sync:
        websocket :
             urls: ws://localhost:9095/websocket

WebsocketSyncDataConfiguration

  @Bean
    public SyncDataService websocketSyncDataService(@NotNull final ObjectProvider<WebsocketConfig> websocketConfig, 
	final ObjectProvider<
PluginDataSubscriber>
pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { //建立Websocket客戶端 return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig:
:new), pluginSubscriber.getIfAvailable(),metaSubscribers.getIfAvailable(Collections::emptyList),authSubscribers.getIfAvailable(Collections::emptyList)); } @Bean @ConfigurationProperties(prefix = "soul.sync.websocket") //websocket配置 public WebsocketConfig websocketConfig() { return new WebsocketConfig(); }

​ 1. 使用spring.factories的方式載入,WebsocketSyncDataConfiguration。
​ 2. @ConfigurationProperties 將 websocket 的配置資訊封裝成實體類。
​ 3. 建立Websocket客戶端。

WebsocketSyncDataService

 private List<WebSocketClient> clients = new ArrayList<>();

    private final ScheduledThreadPoolExecutor executor;
    
    public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
                                    final PluginDataSubscriber pluginDataSubscriber,
                                    final List<MetaDataSubscriber> metaDataSubscribers,
                                    final List<AuthDataSubscriber> authDataSubscribers) {
        //多個websocket地址
        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
        //建立定時任務的執行緒
        executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
        //每個url建立Websocket客戶端
        for (String url : urls) {
            try {
                clients.add(new SoulWebsocketClient(
                    new URI(url), Objects.requireNonNull(pluginDataSubscriber), 
                    	metaDataSubscribers, authDataSubscribers));
            } catch (URISyntaxException e) {
                log.error("websocket url({}) is error", url, e);
            }
        }
        try {
            for (WebSocketClient client : clients) {
               //每個url建立Websocket客戶端
                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
                if (success) {
                    log.info("websocket connection is successful.....");
                } else {
                    log.error("websocket connection is error.....");
                }
                executor.scheduleAtFixedRate(() -> {
                    try {
                        //狀態是關閉或未建立連線
                        if (client.isClosed()) {
                            boolean reconnectSuccess = client.reconnectBlocking();
                            if (reconnectSuccess) {
                                log.info("websocket reconnect is successful.....");
                            } else {
                                log.error("websocket reconnection is error.....");
                            }
                        }
                    } catch (InterruptedException e) {
                        log.error("websocket connect is error :{}", e.getMessage());
                    }
                }, 10, 30, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e) {
            log.info("websocket connection...exception....", e);
        }
    }

    //資源關閉
    @Override
    public void close() {
        //關閉socket連線
        for (WebSocketClient client : clients) {
            if (!client.isClosed()) {
                client.close();
            }
        }
        if (Objects.nonNull(executor)) {
            executor.shutdown();
        }
    }

​ 1. 每個url建立websocket客戶端與服務端建立連線,因為是阻塞的所以設定超時時間,防止被卡住。
​ 2. 狀態isClosed表示關閉或未建立連線,定時任務的執行緒進行重試,10秒後執行,間隔週期為30秒。
​ 3. 實現AutoCloseable介面對資源的關閉,關閉socket連線。

SoulWebsocketClient

 
    private volatile boolean alreadySync = Boolean.FALSE;
    private final WebsocketDataHandler websocketDataHandler;
    
    public SoulWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,
                               final List<MetaDataSubscriber> metaDataSubscribers, 
                               final List<AuthDataSubscriber> authDataSubscribers) {
        super(serverUri);
        this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, 
        metaDataSubscribers, authDataSubscribers);
    }
    
    @Override
    public void onOpen(final ServerHandshake serverHandshake) {
        if (!alreadySync) {
        	//傳送MYSELF到服務端,獲取全量資料
            send(DataEventTypeEnum.MYSELF.name());
            alreadySync = true;
        }
    }
    
    @Override
    public void onMessage(final String result) {
    	//接受訊息
        handleResult(result);
    }
    
    @Override
    public void onClose(final int i, final String s, final boolean b) {
        this.close();
    }
    
    @Override
    public void onError(final Exception e) {
        this.close();
    }
    
    @SuppressWarnings("ALL")
    private void handleResult(final String result) {
    	//處理訊息
        WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
        ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
        String eventType = websocketData.getEventType();
        String json = GsonUtils.getInstance().toJson(websocketData.getData());
        websocketDataHandler.executor(groupEnum, json, eventType);
    }

​ 1. 建立連線後傳送訊息到服務端,服務端會發送全量的配置資訊給客戶端。
​ 2. 接收服務端傳送的訊息並進行處理,json資料序列化成物件,從EnumMap中獲取處理的Handler。
​ 3. 事件對應的Handler。

ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));

AbstractDataHandler

    protected abstract List<T> convert(String json);
    protected abstract void doRefresh(List<T> dataList);
    protected abstract void doUpdate(List<T> dataList);
    protected abstract void doDelete(List<T> dataList);

    @Override
    public void handle(final String json, final String eventType) {
        List<T> dataList = convert(json);
        if (CollectionUtils.isNotEmpty(dataList)) {
            DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
            switch (eventTypeEnum) {
                case REFRESH:
                case MYSELF:
                    doRefresh(dataList);
                    break;
                case UPDATE:
                case CREATE:
                    doUpdate(dataList);
                    break;
                case DELETE:
                    doDelete(dataList);
                    break;
                default:
                    break;
            }
        }
    }

​ 1. 定義模板方法,不同事件型別呼叫各自實現類進行更新。