nacos上的註冊過的服務例項掉線分析
阿新 • • 發佈:2021-09-04
最近生產上的xxl_job框架的一個執行器(nacos客戶端)因為分配記憶體不大,導致頻繁與nacos服務端的連線斷開,而斷開之後雖然客戶端服務沒有宕掉,但是就是無法重新註冊到nacos的服務端上去。
基於以上情況,我試著從nacos客戶端註冊與心跳檢測方面跟一下原始碼。
1:首先開啟nacos官網,查詢心跳方面的介紹知識
https://nacos.io/zh-cn/docs/what-is-nacos.html,在open-api模組找到傳送例項心跳介面
然後使用idea開啟nacos原始碼(在github上下載),全域性搜尋/instance/beat,找到傳送心跳的方法
1 public JSONObject sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { 2 3 if (NAMING_LOGGER.isDebugEnabled()) { 4 NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString()); 5 } 6 Map<String, String> params = new HashMap<String, String>(8); 7 String body = StringUtils.EMPTY; 8 if (!lightBeatEnabled) { 9 try { 10 body = "beat=" + URLEncoder.encode(JSON.toJSONString(beatInfo), "UTF-8"); 11 } catch (UnsupportedEncodingException e) { 12 throw new NacosException(NacosException.SERVER_ERROR, "encode beatInfo error", e); 13 } 14 } 15 params.put(CommonParams.NAMESPACE_ID, namespaceId); 16 params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); 17 params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster()); 18 params.put("ip", beatInfo.getIp()); 19 params.put("port", String.valueOf(beatInfo.getPort())); 20 String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, body, HttpMethod.PUT); 21 return JSON.parseObject(result); 22 }
然後在服務端找到nacos服務端接收到客戶端心跳後操作邏輯如下:
1 @CanDistro 2 @PutMapping("/beat") 3 @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) 4 public JSONObject beat(HttpServletRequest request) throws Exception { 5 6 JSONObject result = new JSONObject(); 7 8 result.put("clientBeatInterval", switchDomain.getClientBeatInterval()); 9 String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); 10 String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, 11 Constants.DEFAULT_NAMESPACE_ID); 12 String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, 13 UtilsAndCommons.DEFAULT_CLUSTER_NAME); 14 String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY); 15 int port = Integer.parseInt(WebUtils.optional(request, "port", "0")); 16 String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); 17 18 RsInfo clientBeat = null; 19 if (StringUtils.isNotBlank(beat)) { 20 clientBeat = JSON.parseObject(beat, RsInfo.class); 21 } 22 23 if (clientBeat != null) { 24 if (StringUtils.isNotBlank(clientBeat.getCluster())) { 25 clusterName = clientBeat.getCluster(); 26 } else { 27 // fix #2533 28 clientBeat.setCluster(clusterName); 29 } 30 ip = clientBeat.getIp(); 31 port = clientBeat.getPort(); 32 } 33 34 if (Loggers.SRV_LOG.isDebugEnabled()) { 35 Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); 36 } 37 38 Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); 39 40 if (instance == null) { 41 if (clientBeat == null) { 42 result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); 43 return result; 44 } 45 instance = new Instance(); 46 instance.setPort(clientBeat.getPort()); 47 instance.setIp(clientBeat.getIp()); 48 instance.setWeight(clientBeat.getWeight()); 49 instance.setMetadata(clientBeat.getMetadata()); 50 instance.setClusterName(clusterName); 51 instance.setServiceName(serviceName); 52 instance.setInstanceId(instance.getInstanceId()); 53 instance.setEphemeral(clientBeat.isEphemeral()); 54 55 serviceManager.registerInstance(namespaceId, serviceName, instance); 56 } 57 58 Service service = serviceManager.getService(namespaceId, serviceName); 59 60 if (service == null) { 61 throw new NacosException(NacosException.SERVER_ERROR, 62 "service not found: " + serviceName + "@" + namespaceId); 63 } 64 if (clientBeat == null) { 65 clientBeat = new RsInfo(); 66 clientBeat.setIp(ip); 67 clientBeat.setPort(port); 68 clientBeat.setCluster(clusterName); 69 } 70 service.processClientBeat(clientBeat); 71 72 result.put(CommonParams.CODE, NamingResponseCode.OK); 73 result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval()); 74 result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); 75 return result; 76 }
接下來,我們跟一下service.processClientBeat(clientBeat)這個方法,看看裡面的業務邏輯到底是怎樣的?
public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); HealthCheckReactor.scheduleNow(clientBeatProcessor); }
再跟一下:
@Override public void run() { Service service = this.service; if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString()); } String ip = rsInfo.getIp(); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); Cluster cluster = service.getClusterMap().get(clusterName); List<Instance> instances = cluster.allIPs(true); for (Instance instance : instances) { if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); } instance.setLastBeat(System.currentTimeMillis()); if (!instance.isMarked()) { if (!instance.isHealthy()) { instance.setHealthy(true); Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE); getPushService().serviceChanged(service); } } } } }
到這裡,我發現nacos服務端接收到客戶端服務例項的資訊後,主要是做了心跳的最新時間更新和健康狀態更新,我去nacos相關的資料庫表裡面看了一下相關資訊
看一下刪除例項的邏輯:
@Override public void run() { try { if (!getDistroMapper().responsible(service.getName())) { return; } if (!getSwitchDomain().isHealthCheckEnabled()) { return; } List<Instance> instances = service.allIPs(true); // first set health status of instances: for (Instance instance : instances) { if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { instance.setHealthy(false); Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat()); getPushService().serviceChanged(service); SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } } if (!getGlobalConfig().isExpireInstance()) { return; } // then remove obsolete instances: for (Instance instance : instances) { if (instance.isMarked()) { continue; } if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance)); deleteIP(instance); } } } catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); } }