1. 程式人生 > 其它 >nacos上的註冊過的服務例項掉線分析

nacos上的註冊過的服務例項掉線分析

最近生產上的xxl_job框架的一個執行器(nacos客戶端)因為分配記憶體不大,導致頻繁與nacos服務端的連線斷開,而斷開之後雖然客戶端服務沒有宕掉,但是就是無法重新註冊到nacos的服務端上去。

基於以上情況,我試著從nacos客戶端註冊與心跳檢測方面跟一下原始碼。

1:首先開啟nacos官網,查詢心跳方面的介紹知識

https://nacos.io/zh-cn/docs/what-is-nacos.html,在open-api模組找到傳送例項心跳介面

然後使用idea開啟nacos原始碼(在github上下載),全域性搜尋/instance/beat,找到傳送心跳的方法

 1   public JSONObject sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
 2 
 3         if (NAMING_LOGGER.isDebugEnabled()) {
 4             NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
 5         }
 6         Map<String, String> params = new HashMap<String, String>(8);
 7         String body = StringUtils.EMPTY;
 8         if (!lightBeatEnabled) {
 9             try {
10                 body = "beat=" + URLEncoder.encode(JSON.toJSONString(beatInfo), "UTF-8");
11             } catch (UnsupportedEncodingException e) {
12                 throw new NacosException(NacosException.SERVER_ERROR, "encode beatInfo error", e);
13             }
14         }
15         params.put(CommonParams.NAMESPACE_ID, namespaceId);
16         params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
17         params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
18         params.put("ip", beatInfo.getIp());
19         params.put("port", String.valueOf(beatInfo.getPort()));
20         String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, body, HttpMethod.PUT);
21         return JSON.parseObject(result);
22     }

然後在服務端找到nacos服務端接收到客戶端心跳後操作邏輯如下:

 1   @CanDistro
 2     @PutMapping("/beat")
 3     @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
 4     public JSONObject beat(HttpServletRequest request) throws Exception {
 5 
 6         JSONObject result = new JSONObject();
 7 
 8         result.put("clientBeatInterval", switchDomain.getClientBeatInterval());
 9         String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
10         String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
11             Constants.DEFAULT_NAMESPACE_ID);
12         String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME,
13             UtilsAndCommons.DEFAULT_CLUSTER_NAME);
14         String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
15         int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
16         String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
17 
18         RsInfo clientBeat = null;
19         if (StringUtils.isNotBlank(beat)) {
20             clientBeat = JSON.parseObject(beat, RsInfo.class);
21         }
22 
23         if (clientBeat != null) {
24             if (StringUtils.isNotBlank(clientBeat.getCluster())) {
25                 clusterName = clientBeat.getCluster();
26             } else {
27                 // fix #2533
28                 clientBeat.setCluster(clusterName);
29             }
30             ip = clientBeat.getIp();
31             port = clientBeat.getPort();
32         }
33 
34         if (Loggers.SRV_LOG.isDebugEnabled()) {
35             Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
36         }
37 
38         Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
39 
40         if (instance == null) {
41             if (clientBeat == null) {
42                 result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
43                 return result;
44             }
45             instance = new Instance();
46             instance.setPort(clientBeat.getPort());
47             instance.setIp(clientBeat.getIp());
48             instance.setWeight(clientBeat.getWeight());
49             instance.setMetadata(clientBeat.getMetadata());
50             instance.setClusterName(clusterName);
51             instance.setServiceName(serviceName);
52             instance.setInstanceId(instance.getInstanceId());
53             instance.setEphemeral(clientBeat.isEphemeral());
54 
55             serviceManager.registerInstance(namespaceId, serviceName, instance);
56         }
57 
58         Service service = serviceManager.getService(namespaceId, serviceName);
59 
60         if (service == null) {
61             throw new NacosException(NacosException.SERVER_ERROR,
62                 "service not found: " + serviceName + "@" + namespaceId);
63         }
64         if (clientBeat == null) {
65             clientBeat = new RsInfo();
66             clientBeat.setIp(ip);
67             clientBeat.setPort(port);
68             clientBeat.setCluster(clusterName);
69         }
70         service.processClientBeat(clientBeat);
71 
72         result.put(CommonParams.CODE, NamingResponseCode.OK);
73         result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());
74         result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
75         return result;
76     }

接下來,我們跟一下service.processClientBeat(clientBeat)這個方法,看看裡面的業務邏輯到底是怎樣的?

    public void processClientBeat(final RsInfo rsInfo) {
        ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
        clientBeatProcessor.setService(this);
        clientBeatProcessor.setRsInfo(rsInfo);
        HealthCheckReactor.scheduleNow(clientBeatProcessor);
    }

再跟一下:

 @Override
    public void run() {
        Service service = this.service;
        if (Loggers.EVT_LOG.isDebugEnabled()) {
            Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
        }

        String ip = rsInfo.getIp();
        String clusterName = rsInfo.getCluster();
        int port = rsInfo.getPort();
        Cluster cluster = service.getClusterMap().get(clusterName);
        List<Instance> instances = cluster.allIPs(true);

        for (Instance instance : instances) {
            if (instance.getIp().equals(ip) && instance.getPort() == port) {
                if (Loggers.EVT_LOG.isDebugEnabled()) {
                    Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
                }
                instance.setLastBeat(System.currentTimeMillis());
                if (!instance.isMarked()) {
                    if (!instance.isHealthy()) {
                        instance.setHealthy(true);
                        Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                            cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
                        getPushService().serviceChanged(service);
                    }
                }
            }
        }
    }

到這裡,我發現nacos服務端接收到客戶端服務例項的資訊後,主要是做了心跳的最新時間更新和健康狀態更新,我去nacos相關的資料庫表裡面看了一下相關資訊

看一下刪除例項的邏輯:

 @Override
    public void run() {
        try {
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }

            if (!getSwitchDomain().isHealthCheckEnabled()) {
                return;
            }

            List<Instance> instances = service.allIPs(true);

            // first set health status of instances:
            for (Instance instance : instances) {
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),
                                UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                            getPushService().serviceChanged(service);
                            SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }

            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }

            // then remove obsolete instances:
            for (Instance instance : instances) {

                if (instance.isMarked()) {
                    continue;
                }

                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));
                    deleteIP(instance);
                }
            }

        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }

    }