Nacos架構篇 - 配置中心
技術標籤:# Nacos
客戶端處理
一、NacosPropertySourceLocator
@Override
public PropertySource<?> locate(Environment env) {
nacosConfigProperties.setEnvironment(env);
ConfigService configService = nacosConfigManager.getConfigService();
if (null == configService) {
log.warn("no instance of config service found, can't load config from nacos" );
return null;
}
long timeout = nacosConfigProperties.getTimeout();
nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
timeout);
String name = nacosConfigProperties.getName();
String dataIdPrefix = nacosConfigProperties.getPrefix();
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = name;
}
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = env.getProperty("spring.application.name");
}
CompositePropertySource composite = new CompositePropertySource(
NACOS_PROPERTY_SOURCE_NAME) ;
// 載入共享配置。
loadSharedConfiguration(composite);
// 載入拓展配置。
loadExtConfiguration(composite);
// 載入應用名稱對應的配置。
loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
return composite;
}
載入共享配置。
載入拓展配置。
載入應用名稱對應的配置。
二、NacosContextRefresher、RefreshEventListener
NacosContextRefresher:
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
// many Spring context
if (this.ready.compareAndSet(false, true)) {
this.registerNacosListenersForApplications();
}
}
private void registerNacosListenersForApplications() {
if (isRefreshEnabled()) {
for (NacosPropertySource propertySource : NacosPropertySourceRepository
.getAll()) {
if (!propertySource.isRefreshable()) {
continue;
}
String dataId = propertySource.getDataId();
registerNacosListener(propertySource.getGroup(), dataId);
}
}
}
private void registerNacosListener(final String groupKey, final String dataKey) {
String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
Listener listener = listenerMap.computeIfAbsent(key,
lst -> new AbstractSharedListener() {
@Override
public void innerReceive(String dataId, String group,
String configInfo) {
refreshCountIncrement();
nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
// todo feature: support single refresh for listening
applicationContext.publishEvent(
new RefreshEvent(this, null, "Refresh Nacos config"));
if (log.isDebugEnabled()) {
log.debug(String.format(
"Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
group, dataId, configInfo));
}
}
});
try {
configService.addListener(dataKey, groupKey, listener);
}
catch (NacosException e) {
log.warn(String.format(
"register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,
groupKey), e);
}
}
配置發生變更時,釋出一個RefreshEvent事件。
RefreshEventListener:
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ApplicationReadyEvent) {
handle((ApplicationReadyEvent) event);
}
else if (event instanceof RefreshEvent) {
handle((RefreshEvent) event);
}
}
public void handle(ApplicationReadyEvent event) {
this.ready.compareAndSet(false, true);
}
public void handle(RefreshEvent event) {
if (this.ready.get()) { // don't handle events before app is ready
log.debug("Event received " + event.getEventDesc());
// 完成配置的更新和應用。
Set<String> keys = this.refresh.refresh();
log.info("Refresh keys changed: " + keys);
}
}
完成配置的更新和應用。
三、NacosConfigService
NacosFactory:
public static ConfigService createConfigService(Properties properties) throws NacosException {
return ConfigFactory.createConfigService(properties);
}
ConfigFactory:
public static ConfigService createConfigService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
public NacosConfigService(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
encode = Constants.ENCODE;
} else {
encode = encodeTmp.trim();
}
initNamespace(properties);
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
agent.start();
worker = new ClientWorker(agent, configFilterChainManager, properties);
}
ClientWorker
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
// Initialize the timeout parameter
init(properties);
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
// 每隔10ms,檢查一次配置是否發生了變化
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
public void checkConfigInfo() {
// 分任務
int listenerSize = cacheMap.get().size();
// 向上取整為批數
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判斷任務是否在執行 這塊需要好好想想。 任務列表現在是無序的。變化過程可能有問題
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
LongPollingRunnable
@Override
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// check failover config
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// check server config
// 通過長輪詢請求檢查服務端對應的配置是否發生了變更
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
if (!CollectionUtils.isEmpty(changedGroupKeys)) {
LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
}
// 遍歷存在變更的groupKey
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
// 對於發生變更的資料,客戶端向服務端請求對應的最新資料
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);
if (null != ct[1]) {
cache.setType(ct[1]);
}
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(ct[0]), ct[1]);
} catch (NacosException ioe) {
String message = String.format(
"[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
// 觸發事件通知
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
// 繼續定時執行當前任務
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
通過長輪詢請求檢查服務端對應的配置是否發生了變更。
對於發生變更的資料,客戶端向服務端請求對應的最新資料,然後更新本地資料。
服務端長輪詢處理
ConfigController
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
String probeModify = request.getParameter("Listening-Configs");
if (StringUtils.isBlank(probeModify)) {
throw new IllegalArgumentException("invalid probeModify");
}
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map<String, String> clientMd5Map;
try {
// 獲取客戶端需要監聽的可能會發生變化的配置,並且計算MD5值。
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
}
catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
// 執行長輪詢請求。
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
獲取客戶端需要監聽的可能會發生變化的配置,並且計算MD5值。
執行長輪詢請求。
ConfigServletInner
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize)
throws IOException {
// 長輪詢
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
// else 相容短輪詢邏輯
List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
// 相容短輪詢result
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);
String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
if (version == null) {
version = "2.0.0";
}
int versionNum = Protocol.getVersionNumber(version);
/**
* 2.0.4版本以前, 返回值放入header中
*/
if (versionNum < START_LONG_POLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
request.setAttribute("content", newResult);
}
Loggers.AUTH.info("new content:" + newResult);
// 禁用快取
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + "";
}
這裡僅關注長輪詢的方法。
LongPollingService
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
/**
* 提前500ms返回響應,為避免客戶端超時 @qiaoyi.dingqy 2013.10.22改動 add delay time for LoadBalance
*/
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
/* 獲取isFixedPolling引數值 */
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// do nothing but set fix polling timeout
} else {
long start = System.currentTimeMillis();
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
clientMd5Map.size(), probeRequestSize, changedGroups.size());
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
// 一定要由HTTP執行緒呼叫,否則離開後容器會立即傳送響應
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout()的超時時間不準,所以只能自己控制
asyncContext.setTimeout(0L);
scheduler.execute(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
獲取客戶端請求的超時時間,如果是isFixedPolling,定時任務在30秒之後執行;否則,定時任務在29.5秒後執行。
將服務端的資料與客戶端傳入的資料進行MD5對比,如果發生過變化,直接返回。
將客戶端的請求封裝成ClientLongPolling,也就是一個定時任務,交給排程器排程執行。
ClientLongPolling
@Override
public void run() {
asyncTimeoutFuture = scheduler.schedule(new Runnable() {
@Override
public void run() {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
/**
* 刪除訂閱關係
*/
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - createTime),
"fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
"polling",
clientMd5Map.size(), probeRequestSize);
// 通過比較MD5值,判斷客戶端請求的groupKeys是否發生變更
List<String> changedGroups = MD5Util.compareMd5(
(HttpServletRequest)asyncContext.getRequest(),
(HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
if (changedGroups.size() > 0) {
// 將變更的結果返回給客戶端
sendResponse(changedGroups);
} else {
sendResponse(null);
}
} else {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - createTime),
"timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
"polling",
clientMd5Map.size(), probeRequestSize);
sendResponse(null);
}
} catch (Throwable t) {
LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
// 將ClientLongPolling例項新增到佇列中
allSubs.add(this);
}
將ClientLongPolling例項新增到佇列中。
定時任務執行後,先將ClientLongPolling例項從佇列中移除。
通過比較MD5值,判斷客戶端請求的groupKeys是否發生變更,將變更的結果返回給客戶端。
所謂的長輪詢就是服務端接收請求之後,不立即返回,而是延後29.5秒才把請求結果返回給客戶端。這也使得客戶端與服務端之間在30秒之內沒有發生資料變化的情況下一直處於連線狀態。
LongPollingService
服務端的配置資料發生變化時,會發佈一個LocalDataChangeEvent事件。
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// ignore
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
執行DataChangeTask。
@Override
public void run() {
try {
ConfigCacheService.getContentBetaMd5(groupKey);
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// 如果beta釋出且不在beta列表直接跳過
if (isBeta && !betaIps.contains(clientSub.ip)) {
continue;
}
// 如果tag釋出且不在tag列表直接跳過
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
continue;
}
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // 刪除訂閱關係
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - changeTime),
"in-advance",
RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),
"polling",
clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
} catch (Throwable t) {
LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());
}
}
遍歷allSubs中的客戶端長輪詢請求。
比較每一個客戶端長輪詢請求攜帶的groupKey,如果服務端變更的配置和客戶端請求關注的配置一致,則將這些一致的groupKey返回。