1. 程式人生 > >SpringCloud Alibaba Nacos註冊中心原始碼淺析

SpringCloud Alibaba Nacos註冊中心原始碼淺析

# 一、前置瞭解 ## 1.1 簡介 Nacos是一款阿里巴巴推出的一款微服務發現、配置管理框架。我們本次對將對它的服務註冊發現功能進行簡單原始碼分析。 ## 1.2 流程 Nacos的分析分為兩部分,一部分是我們的客戶端(將自己註冊到Nacos),另一部分是Nacos Server處理我們的註冊請求等。 ## 1.3 要分析demo示例 細節篇幅不多展示,大致如下 ### 1.3.1 客戶端方面: 引入了pom依賴 ``` com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery ``` 並在application.yml配置好nacos地址(本地),我們的這個應用啟動後會向Nacos服務端去註冊。 ### 1.3.2 Nacos服務端方面 我們從https://github.com/alibaba/nacos,即Nacos的官網github按tag拉下原始碼到本地。 ![](https://img2020.cnblogs.com/blog/1085149/202101/1085149-20210125151506312-165369523.png) 會有很多模組:address、api、client、cmdb、core、console等等。 ![](https://img2020.cnblogs.com/blog/1085149/202101/1085149-20210125151524874-904562430.png) 從console裡的Nacos.java檔案啟動即可,它是個SpringBoot應用,啟動後就可以處理註冊等請求了。 # 二、Nacos客戶端原始碼流程 ## 2.1 自動配置觸發邏輯入口 開啟客戶端引入的依賴包的pom,只引入了spring-cloud-alibaba-nacos-discovery: ![](https://img2020.cnblogs.com/blog/1085149/202101/1085149-20210125151537009-269863984.png) SpringCloud系列都是通過spring.factories檔案進行自動配置,我們開啟spring-cloud-alibaba-nacos-discovery的spring.factories檔案: ![](https://img2020.cnblogs.com/blog/1085149/202101/1085149-20210125151546681-633876256.png) 去看看NacosDiscoveryAutoConfiguration這個名字的,名字可以看出它是和自動註冊發現相關的配置類: ```java @Configuration @EnableConfigurationProperties @ConditionalOnNacosDiscoveryEnabled @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class }) public class NacosDiscoveryAutoConfiguration { @Bean public NacosServiceRegistry nacosServiceRegistry( NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration( NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(nacosDiscoveryProperties, context); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); } } ``` 註冊了三個Bean,各個Bean名字也是見名知義,上面兩個是服務與nacos註冊邏輯本身,最後一個Auto的才是自動配置相關的,應該是入口。 開啟NacosAutoServiceRegistration原始碼,會發現它的父類AbstractAutoServiceRegistration實現了ApplicationListener介面,一般很多框架都是通過監聽spring事件機制然後開始運作各自的原始碼邏輯,開啟ApplicationListener介面的重寫方法看看: ```java public abstract class AbstractAutoServiceRegistration implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener { //略*** @Override @SuppressWarnings("deprecation") public void onApplicationEvent(WebServerInitializedEvent event) { bind(event); } ``` 註冊入口應該就是這裡,bind方法開始執行nacos自己的邏輯,bind方法: ```java public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); //略 this.port.compareAndSet(0, event.getWebServer().getPort()); this.start(); } ``` start: ```java public void start() { //略 if (!this.running.get()) { this.context.publishEvent( new InstancePreRegisteredEvent(this, getRegistration())); register(); if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent( new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); } } ``` 這裡就可以發現自動配置觸發的註冊方法了,register();,後續就是如何註冊了! ## 2.2 客戶端註冊邏輯 register() 不斷跟進剛剛的多個register()重名方法,會來到真正的register方法,如下: ```java public void register(Registration registration) { //略 String serviceId = registration.getServiceId(); Instance instance = getNacosInstanceFromRegistration(registration); try { namingService.registerInstance(serviceId, instance); //略 } catch (Exception e) { //略 } } ``` 邏輯比較直接,主要是獲取服務id(比如服務名啥的)+這個例項的具體資訊(封裝成Instance),最後通過namingService去註冊,跟進註冊: ```java public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { //判斷是否是臨時節點 if (instance.isEphemeral()) { BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); //略 beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); } ``` ### 心跳機制 其實這裡可以看出如果不是臨時節點是不需要傳送心跳訊息的,這裡心跳機制是通過beatReactor.addBeatInfo裡內部的一個定時任務去實現的,核心就是內部的: ```java long result = serverProxy.sendBeat(beatInfo); long nextTime = result > 0 ? result : beatInfo.getPeriod(); executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); ``` 通過執行緒池跑任務,定時訪問Nacos服務端的/instance/beat介面,傳送HTTP請求 表示自己活著 ### 繼續看註冊 剛剛registerInstance裡的 ``` serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); ``` 繼續跟進: ``` public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); final Map params = new HashMap(9); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JSON.toJSONString(instance.getMetadata())); reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST); } ``` 其實就是準備引數準備發http請求了哈,註冊的介面地址是NACOS_URL_INSTANCE,也就是:/instance的post請求 客戶端註冊總結: 1.通過SpringCloud一貫使用的spring.factories檔案進行自動配置 2.自動配置類將自己注入IOC容器,並實現了ApplicationListener介面,在web容器初始化事件釋出之後載入自己的邏輯 3.載入註冊邏輯,通過傳送http請求到/instance介面將本身的資訊發給Nacos服務端,以及心跳任務定時傳送,告訴自己活著 # 三、Nacos服務端處理註冊 上面有說到nacos客戶端註冊是通過傳送http請求到/instance介面。我們看看/instance介面做了什麼。Nacos服務端的controller原始碼如下: ```java @RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { //...略 @CanDistro @PostMapping public String register(HttpServletRequest request) throws Exception { String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request)); return "ok"; } } ``` 跟進裡面的serviceManager.registerInstance註冊方法: ```java public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); } ``` createEmptyService是要在放入instance例項(即註冊的那個節點資訊)之前確保service存在,不存在則建立一個,之後就可以通過getService取出來了。最後再通過addInstance繼續註冊 看看createEmptyService是怎麼建立的,什麼結構? ## 3.1 createEmptyService建立保證Service 通過斷點不斷跟進createEmptyService方法原始碼,會來到ServiceManager.java的putService方法: ```java public void putService(Service service) { if (!serviceMap.containsKey(service.getNamespaceId())) { synchronized (putServiceLock) { if (!serviceMap.containsKey(service.getNamespaceId())) { serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16)); } } } serviceMap.get(service.getNamespaceId()).put(service.getName(), service); } ``` 最後是放到到一個serviceMap的Map結構去了,如下: ```java private Map> serviceMap = new ConcurrentHashMap<>(); ``` 雙層Map,內部含義其實是: ```java Map>//第一層key是namespace,第二層裡才是name和service ``` 實際上放入map之後,還會把service初始化,呼叫init方法,內部會執行健康檢查: 1.某個例項超過15秒沒收到心跳則把它的healthy屬性設定為false 2.繼續超過30秒沒收到心跳就會直接剔除這個例項 ## 3.2 addInstance註冊 回到前面的註冊地方,最後保證了有Service之後繼續走主邏輯,addInstance: ``` addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); ``` 跟進 ```java public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { List instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); } } ``` 最後是執行consistencyService.put(key, instances);註冊,這裡會有兩個實現DistroConsistencyServiceImpl和RaftConsistencyServiceImpl,分別對應著註冊中心的AP實現和CP實現,一個基於記憶體優先可用性(A),一個基於磁碟優先一致性(C),是CAP理論裡的取捨。CAP具體可看:https://baike.baidu.com/item/CAP%E5%8E%9F%E5%88%99/5712863?fr=aladdin # 四、Nacos服務端AP模式實現:DistroConsistencyServiceImpl Nacos的AP模式採用distro協議,Distro是阿里的自創協議,Distro 協議被定位為 **臨時資料的一致性協議** 繼續看之前的原始碼,註冊最後是來到: ```java consistencyService.put(key, instances); ``` 跟進: ```java @Override public void put(String key, Record value) throws NacosException { //1.將註冊例項更新到記憶體登錄檔 onPut(key, value); //2.同步例項資訊到Nacos Server叢集其它節點 taskDispatcher.addTask(key); } ``` 如加的註釋這樣,分了兩步實現 ## 4.1 onPut將註冊例項更新到記憶體登錄檔 跟進onPut原始碼: ```java public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { //封裝資料節點儲存 Datum datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } //只要傳key就拿到上面的節點去更新了 notifier.addTask(key, ApplyAction.CHANGE); } ``` 這裡也看到了最後notifier.addTask運用了生產者消費者的思想,裡面是新增一個任務到阻塞佇列中去,等著處理,因為這些操作本身不需要立即返回成功,對提升效能有很大幫助。 傳了ApplyAction.CHANGE型別,我們跟進notifier.addTask,會發現是在Notifier內部類裡,它是多執行緒Runnable的實現類,邏輯都在run方法裡,等著對應的執行緒調起執行。 ```java public class Notifier implements Runnable { //略部分程式碼 @Override public void run() { while (true) { try { //略部分程式碼 for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == ApplyAction.CHANGE) { listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == ApplyAction.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { //略 } } //略 } catch (Throwable e) { //略 } } } } ``` 判斷是剛才我們傳的ApplyAction.CHANGE會去執行listener.onChange,這裡有多個實現,我們可以通過打斷點進入的是com.alibaba.nacos.naming.core.Service類中 ```java public void onChange(String key, Instances value) throws Exception { //略 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); } ``` 核心就是updateIPs: ```java public void updateIPs(Collection instances, boolean ephemeral) { Map> ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJSON()); Cluster cluster = new Cluster(instance.getClusterName(), this); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } List clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry> entry : ipMap.entrySet()) { //make every ip mine List entryIPs = entry.getValue(); clusterMap.get(entry.getKey()).updateIPs(entryIPs, ephemeral); } setLastModifiedMillis(System.currentTimeMillis()); getPushService().serviceChanged(this); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIPAddr()).append("_").append(instance.isHealthy()).append(","); } } ``` 為了防止讀寫併發衝突,方法第一句直接建立了一個新的HashMap,然後去操作新的HashMap,操作完了之後再去替換老的Map資料,CopyOnWrite的思想。 Eureka防止讀寫衝突用的是多級快取結構,多級快取定時同步,客戶端感知及時性不如Nacos。 最後還發布了服務變化事件 ## 4.2 同步例項資訊到Nacos Server叢集其它節點 回到之前的程式碼,put方法中是taskDispatcher.addTask(key);進行同步資訊到叢集其它節點,跟進程式碼: ```java public void addTask(String key) { queue.offer(key); } ``` 就是把節點的key加入到阻塞佇列中了,等待之後執行,這是內部類TaskScheduler裡的方法,看看整體: ```java public class TaskScheduler implements Runnable { //略 public void addTask(String key) { queue.offer(key); } @Override public void run() { List keys = new ArrayList<>(); while (true) { try { String key = queue.poll(partitionConfig.getTaskDispatchPeriod(), TimeUnit.MILLISECONDS); if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) { Loggers.DISTRO.debug("got key: {}", key); } if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) { continue; } if (StringUtils.isBlank(key)) { continue; } if (dataSize == 0) { keys = new ArrayList<>(); } keys.add(key); dataSize++; if (dataSize == partitionConfig.getBatchSyncKeyCount() || (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) { for (Server member : dataSyncer.getServers()) { if (NetUtils.localServer().equals(member.getKey())) { continue; } SyncTask syncTask = new SyncTask(); syncTask.setKeys(keys); syncTask.setTargetServer(member.getKey()); if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) { Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask)); } dataSyncer.submit(syncTask, 0); } lastDispatchTime = System.currentTimeMillis(); dataSize = 0; } } catch (Exception e) { Loggers.DISTRO.error("dispatch sync task failed.", e); } } } } ``` 可以看到if (dataSize == partitionConfig.getBatchSyncKeyCount() || (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) 達到一定是數量或時間差,就開始提交批量傳送同步任務。邏輯在同步類DataSyncer的run方法裡,裡面就是往/distro/datum介面傳送資料同步。 # 五、Nacos服務端CP模式實現:RaftConsistencyServiceImpl Nacos主要是AP模式,CP模式的RaftConsistencyServiceImpl具體就不展開了,這裡只簡單介紹一下大概實現方式: 1.是阿里自己實現的CP模式的簡單raft協議 2.判斷自己是Leader節點的話才執行邏輯,否則轉發給Leader 3.同步更新例項資料到磁碟,非同步更新記憶體登錄檔 4.用CountDownLatch實現,必須叢集半數以上節點寫入成功才返回客戶端成功 5.成功後呼叫/raft/datum/commit介面提交 # 六、服務發現 客戶端通過呼叫/instance/list介面獲取服務端map相關資料,並且會有個延時執行的定時任務去不斷更新最新服