1. 程式人生 > 其它 >Nacos架構篇 - 配置中心

Nacos架構篇 - 配置中心

技術標籤:# Nacos

客戶端處理

一、NacosPropertySourceLocator

@Override
public PropertySource<?> locate(Environment env) {
    nacosConfigProperties.setEnvironment(env);
    ConfigService configService = nacosConfigManager.getConfigService();

    if (null == configService) {
        log.warn("no instance of config service found, can't load config from nacos"
); return null; } long timeout = nacosConfigProperties.getTimeout(); nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout); String name = nacosConfigProperties.getName(); String dataIdPrefix = nacosConfigProperties.getPrefix(); if
(StringUtils.isEmpty(dataIdPrefix)) { dataIdPrefix = name; } if (StringUtils.isEmpty(dataIdPrefix)) { dataIdPrefix = env.getProperty("spring.application.name"); } CompositePropertySource composite = new CompositePropertySource( NACOS_PROPERTY_SOURCE_NAME)
; // 載入共享配置。 loadSharedConfiguration(composite); // 載入拓展配置。 loadExtConfiguration(composite); // 載入應用名稱對應的配置。 loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env); return composite; }

載入共享配置。
載入拓展配置。
載入應用名稱對應的配置。

二、NacosContextRefresher、RefreshEventListener

NacosContextRefresher:

@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
    // many Spring context
    if (this.ready.compareAndSet(false, true)) {
        this.registerNacosListenersForApplications();
    }
}

private void registerNacosListenersForApplications() {
    if (isRefreshEnabled()) {
        for (NacosPropertySource propertySource : NacosPropertySourceRepository
                .getAll()) {
            if (!propertySource.isRefreshable()) {
                continue;
            }
            String dataId = propertySource.getDataId();
            registerNacosListener(propertySource.getGroup(), dataId);
        }
    }
}

private void registerNacosListener(final String groupKey, final String dataKey) {
    String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
    Listener listener = listenerMap.computeIfAbsent(key,
            lst -> new AbstractSharedListener() {
                @Override
                public void innerReceive(String dataId, String group,
                        String configInfo) {
                    refreshCountIncrement();
                    nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
                    // todo feature: support single refresh for listening
                    applicationContext.publishEvent(
                            new RefreshEvent(this, null, "Refresh Nacos config"));
                    if (log.isDebugEnabled()) {
                        log.debug(String.format(
                                "Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
                                group, dataId, configInfo));
                    }
                }
            });
    try {
        configService.addListener(dataKey, groupKey, listener);
    }
    catch (NacosException e) {
        log.warn(String.format(
                "register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,
                groupKey), e);
    }
}

配置發生變更時,釋出一個RefreshEvent事件。

RefreshEventListener:

@Override
public void onApplicationEvent(ApplicationEvent event) {
    if (event instanceof ApplicationReadyEvent) {
        handle((ApplicationReadyEvent) event);
    }
    else if (event instanceof RefreshEvent) {
        handle((RefreshEvent) event);
    }
}

public void handle(ApplicationReadyEvent event) {
    this.ready.compareAndSet(false, true);
}

public void handle(RefreshEvent event) {
    if (this.ready.get()) { // don't handle events before app is ready
        log.debug("Event received " + event.getEventDesc());
        // 完成配置的更新和應用。
        Set<String> keys = this.refresh.refresh();
        log.info("Refresh keys changed: " + keys);
    }
}

完成配置的更新和應用。

三、NacosConfigService

NacosFactory:

public static ConfigService createConfigService(Properties properties) throws NacosException {
    return ConfigFactory.createConfigService(properties);
}

ConfigFactory:

public static ConfigService createConfigService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}
public NacosConfigService(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        encode = Constants.ENCODE;
    } else {
        encode = encodeTmp.trim();
    }
    initNamespace(properties);
    agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    agent.start();
    worker = new ClientWorker(agent, configFilterChainManager, properties);
}

ClientWorker

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager;

    // Initialize the timeout parameter

    init(properties);

    executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });

    executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });

    executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
            	// 每隔10ms,檢查一次配置是否發生了變化
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}
public void checkConfigInfo() {
    // 分任務
    int listenerSize = cacheMap.get().size();
    // 向上取整為批數
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // 要判斷任務是否在執行 這塊需要好好想想。 任務列表現在是無序的。變化過程可能有問題
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

LongPollingRunnable

@Override
public void run() {

    List<CacheData> cacheDatas = new ArrayList<CacheData>();
    List<String> inInitializingCacheList = new ArrayList<String>();
    try {
        // check failover config
        for (CacheData cacheData : cacheMap.get().values()) {
            if (cacheData.getTaskId() == taskId) {
                cacheDatas.add(cacheData);
                try {
                    checkLocalConfig(cacheData);
                    if (cacheData.isUseLocalConfigInfo()) {
                        cacheData.checkListenerMd5();
                    }
                } catch (Exception e) {
                    LOGGER.error("get local config info error", e);
                }
            }
        }

        // check server config
        // 通過長輪詢請求檢查服務端對應的配置是否發生了變更
        List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
        if (!CollectionUtils.isEmpty(changedGroupKeys)) {
            LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
        }

		// 遍歷存在變更的groupKey
        for (String groupKey : changedGroupKeys) {
            String[] key = GroupKey.parseKey(groupKey);
            String dataId = key[0];
            String group = key[1];
            String tenant = null;
            if (key.length == 3) {
                tenant = key[2];
            }
            try {
            	// 對於發生變更的資料,客戶端向服務端請求對應的最新資料
                String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                cache.setContent(ct[0]);
                if (null != ct[1]) {
                    cache.setType(ct[1]);
                }
                LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                    agent.getName(), dataId, group, tenant, cache.getMd5(),
                    ContentUtils.truncateContent(ct[0]), ct[1]);
            } catch (NacosException ioe) {
                String message = String.format(
                    "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                    agent.getName(), dataId, group, tenant);
                LOGGER.error(message, ioe);
            }
        }
        for (CacheData cacheData : cacheDatas) {
            if (!cacheData.isInitializing() || inInitializingCacheList
                .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                // 觸發事件通知
                cacheData.checkListenerMd5();
                cacheData.setInitializing(false);
            }
        }
        inInitializingCacheList.clear();
		// 繼續定時執行當前任務
        executorService.execute(this);

    } catch (Throwable e) {

        // If the rotation training task is abnormal, the next execution time of the task will be punished
        LOGGER.error("longPolling error : ", e);
        executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
    }
}

通過長輪詢請求檢查服務端對應的配置是否發生了變更。
對於發生變更的資料,客戶端向服務端請求對應的最新資料,然後更新本地資料。

服務端長輪詢處理

ConfigController

@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
		throws ServletException, IOException {
	request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
	String probeModify = request.getParameter("Listening-Configs");
	if (StringUtils.isBlank(probeModify)) {
		throw new IllegalArgumentException("invalid probeModify");
	}

	probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);

	Map<String, String> clientMd5Map;
	try {
		// 獲取客戶端需要監聽的可能會發生變化的配置,並且計算MD5值。
		clientMd5Map = MD5Util.getClientMd5Map(probeModify);
	}
	catch (Throwable e) {
		throw new IllegalArgumentException("invalid probeModify");
	}

	// 執行長輪詢請求。
	inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

獲取客戶端需要監聽的可能會發生變化的配置,並且計算MD5值。
執行長輪詢請求。

ConfigServletInner

public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
                              Map<String, String> clientMd5Map, int probeRequestSize)
    throws IOException {

    // 長輪詢
    if (LongPollingService.isSupportLongPolling(request)) {
        longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
        return HttpServletResponse.SC_OK + "";
    }

    // else 相容短輪詢邏輯
    List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);

    // 相容短輪詢result
    String oldResult = MD5Util.compareMd5OldResult(changedGroups);
    String newResult = MD5Util.compareMd5ResultString(changedGroups);

    String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
    if (version == null) {
        version = "2.0.0";
    }
    int versionNum = Protocol.getVersionNumber(version);

    /**
     * 2.0.4版本以前, 返回值放入header中
     */
    if (versionNum < START_LONG_POLLING_VERSION_NUM) {
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
    } else {
        request.setAttribute("content", newResult);
    }

    Loggers.AUTH.info("new content:" + newResult);

    // 禁用快取
    response.setHeader("Pragma", "no-cache");
    response.setDateHeader("Expires", 0);
    response.setHeader("Cache-Control", "no-cache,no-store");
    response.setStatus(HttpServletResponse.SC_OK);
    return HttpServletResponse.SC_OK + "";
}

這裡僅關注長輪詢的方法。

LongPollingService

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
                                 int probeRequestSize) {

    String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
    String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
    String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
    String tag = req.getHeader("Vipserver-Tag");
    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
    /**
     * 提前500ms返回響應,為避免客戶端超時 @qiaoyi.dingqy 2013.10.22改動  add delay time for LoadBalance
     */
    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
    
    /* 獲取isFixedPolling引數值 */
    if (isFixedPolling()) {
        timeout = Math.max(10000, getFixedPollingInterval());
        // do nothing but set fix polling timeout
    } else {
        long start = System.currentTimeMillis();
        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
        if (changedGroups.size() > 0) {
            generateResponse(req, rsp, changedGroups);
            LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
                clientMd5Map.size(), probeRequestSize, changedGroups.size());
            return;
        } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
            LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                changedGroups.size());
            return;
        }
    }
    String ip = RequestUtil.getRemoteIp(req);
    // 一定要由HTTP執行緒呼叫,否則離開後容器會立即傳送響應
    final AsyncContext asyncContext = req.startAsync();
    // AsyncContext.setTimeout()的超時時間不準,所以只能自己控制
    asyncContext.setTimeout(0L);

    scheduler.execute(
        new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}

獲取客戶端請求的超時時間,如果是isFixedPolling,定時任務在30秒之後執行;否則,定時任務在29.5秒後執行。
將服務端的資料與客戶端傳入的資料進行MD5對比,如果發生過變化,直接返回。
將客戶端的請求封裝成ClientLongPolling,也就是一個定時任務,交給排程器排程執行。

ClientLongPolling

@Override
public void run() {
    asyncTimeoutFuture = scheduler.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                /**
                 * 刪除訂閱關係
                 */
                allSubs.remove(ClientLongPolling.this);

                if (isFixedPolling()) {
                    LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
                        (System.currentTimeMillis() - createTime),
                        "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
                        "polling",
                        clientMd5Map.size(), probeRequestSize);
                    // 通過比較MD5值,判斷客戶端請求的groupKeys是否發生變更
                    List<String> changedGroups = MD5Util.compareMd5(
                        (HttpServletRequest)asyncContext.getRequest(),
                        (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
                    if (changedGroups.size() > 0) {
                    	// 將變更的結果返回給客戶端
                        sendResponse(changedGroups);
                    } else {
                        sendResponse(null);
                    }
                } else {
                    LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
                        (System.currentTimeMillis() - createTime),
                        "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
                        "polling",
                        clientMd5Map.size(), probeRequestSize);
                    sendResponse(null);
                }
            } catch (Throwable t) {
                LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
            }

        }

    }, timeoutTime, TimeUnit.MILLISECONDS);

	// 將ClientLongPolling例項新增到佇列中
    allSubs.add(this);
}

將ClientLongPolling例項新增到佇列中。
定時任務執行後,先將ClientLongPolling例項從佇列中移除。
通過比較MD5值,判斷客戶端請求的groupKeys是否發生變更,將變更的結果返回給客戶端。

所謂的長輪詢就是服務端接收請求之後,不立即返回,而是延後29.5秒才把請求結果返回給客戶端。這也使得客戶端與服務端之間在30秒之內沒有發生資料變化的情況下一直處於連線狀態。

LongPollingService

服務端的配置資料發生變化時,會發佈一個LocalDataChangeEvent事件。

在這裡插入圖片描述

@Override
public void onEvent(Event event) {
    if (isFixedPolling()) {
        // ignore
    } else {
        if (event instanceof LocalDataChangeEvent) {
            LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
            scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
        }
    }
}

執行DataChangeTask。

@Override
public void run() {
    try {
        ConfigCacheService.getContentBetaMd5(groupKey);
        for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
            ClientLongPolling clientSub = iter.next();
            if (clientSub.clientMd5Map.containsKey(groupKey)) {
                // 如果beta釋出且不在beta列表直接跳過
                if (isBeta && !betaIps.contains(clientSub.ip)) {
                    continue;
                }

                // 如果tag釋出且不在tag列表直接跳過
                if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
                    continue;
                }

                getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                iter.remove(); // 刪除訂閱關係
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                    (System.currentTimeMillis() - changeTime),
                    "in-advance",
                    RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),
                    "polling",
                    clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                clientSub.sendResponse(Arrays.asList(groupKey));
            }
        }
    } catch (Throwable t) {
        LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());
    }
}

遍歷allSubs中的客戶端長輪詢請求。
比較每一個客戶端長輪詢請求攜帶的groupKey,如果服務端變更的配置和客戶端請求關注的配置一致,則將這些一致的groupKey返回。