1. 程式人生 > 程式設計 >聊聊nacos的HealthCheckCommon

聊聊nacos的HealthCheckCommon

本文主要研究一下nacos的HealthCheckCommon

HealthCheckCommon

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckCommon.java

@Component
public class HealthCheckCommon {

    @Autowired
    private DistroMapper distroMapper;

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private ServerListManager serverListManager;

    @Autowired
    private PushService pushService;

    private static LinkedBlockingDeque<HealthCheckResult> healthCheckResults = new LinkedBlockingDeque<>(1024 * 128);

    private static ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory
() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.taobao.health-check.notifier"); return thread; } }); public void init() { executorService.schedule(new Runnable
() { @Override public void run() { List list = Arrays.asList(healthCheckResults.toArray()); healthCheckResults.clear(); List<Server> sameSiteServers = serverListManager.getServers(); if (sameSiteServers == null || sameSiteServers.size() <= 0) { return
; } for (Server server : sameSiteServers) { if (server.getKey().equals(NetUtils.localServer())) { continue; } Map<String,String> params = new HashMap<>(10); params.put("result",JSON.toJSONString(list)); if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("[HEALTH-SYNC] server: {},healthCheckResults: {}",server,JSON.toJSONString(list)); } HttpClient.HttpResult httpResult = HttpClient.httpPost("http://" + server.getKey() + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/api/healthCheckResult",null,params); if (httpResult.code != HttpURLConnection.HTTP_OK) { Loggers.EVT_LOG.warn("[HEALTH-CHECK-SYNC] failed to send result to {},result: {}",JSON.toJSONString(list)); } } } },500,TimeUnit.MILLISECONDS); } //...... public void reEvaluateCheckRT(long checkRT,HealthCheckTask task,SwitchDomain.HealthParams params) { //...... } public void checkOK(Instance ip,String msg) { //...... } public void checkFail(Instance ip,String msg) { //...... } public void checkFailNow(Instance ip,String msg) { //...... } //...... } 複製程式碼
  • HealthCheckCommon的init方法註冊了一個延時任務,往其他server同步healthCheckResults;它主要提供了reEvaluateCheckRT、checkOK、checkFail、checkFailNow方法

reEvaluateCheckRT

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckCommon.java

    public void reEvaluateCheckRT(long checkRT,SwitchDomain.HealthParams params) {
        task.setCheckRTLast(checkRT);

        if (checkRT > task.getCheckRTWorst()) {
            task.setCheckRTWorst(checkRT);
        }

        if (checkRT < task.getCheckRTBest()) {
            task.setCheckRTBest(checkRT);
        }

        checkRT = (long) ((params.getFactor() * task.getCheckRTNormalized()) + (1 - params.getFactor()) * checkRT);

        if (checkRT > params.getMax()) {
            checkRT = params.getMax();
        }

        if (checkRT < params.getMin()) {
            checkRT = params.getMin();
        }

        task.setCheckRTNormalized(checkRT);
    }
複製程式碼
  • reEvaluateCheckRT方法首先更新checkRTLast,然後判斷是否更新checkRTWorst、checkRTBest,之後根據factor及checkRTNormalized引數重置checkRT,最後更新checkRTNormalized

checkOK

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckCommon.java

    public void checkOK(Instance ip,String msg) {
        Cluster cluster = task.getCluster();

        try {
            if (!ip.isHealthy() || !ip.isMockValid()) {
                if (ip.getOKCount().incrementAndGet() >= switchDomain.getCheckTimes()) {
                    if (distroMapper.responsible(cluster,ip)) {
                        ip.setHealthy(true);
                        ip.setMockValid(true);

                        Service service = cluster.getService();
                        service.setLastModifiedMillis(System.currentTimeMillis());
                        pushService.serviceChanged(service);
                        addResult(new HealthCheckResult(service.getName(),ip));

                        Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-ENABLED} valid: {}:{}@{},region: {},msg: {}",cluster.getService().getName(),ip.getIp(),ip.getPort(),cluster.getName(),UtilsAndCommons.LOCALHOST_SITE,msg);
                    } else {
                        if (!ip.isMockValid()) {
                            ip.setMockValid(true);
                            Loggers.EVT_LOG.info("serviceName: {} {PROBE} {IP-ENABLED} valid: {}:{}@{},msg);
                        }
                    }
                } else {
                    Loggers.EVT_LOG.info("serviceName: {} {OTHER} {IP-ENABLED} pre-valid: {}:{}@{} in {},ip.getOKCount(),msg);
                }
            }
        } catch (Throwable t) {
            Loggers.SRV_LOG.error("[CHECK-OK] error when close check task.",t);
        }

        ip.getFailCount().set(0);
        ip.setBeingChecked(false);
    }
複製程式碼
  • checkOK對於非healthy或者mockValid的instance會設定其為healthy及mockValid,然後通過pushService.serviceChanged釋出變更事件,並新增HealthCheckResult到healthCheckResults中

checkFail

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckCommon.java

    public void checkFail(Instance ip,String msg) {
        Cluster cluster = task.getCluster();

        try {
            if (ip.isHealthy() || ip.isMockValid()) {
                if (ip.getFailCount().incrementAndGet() >= switchDomain.getCheckTimes()) {
                    if (distroMapper.responsible(cluster,ip)) {
                        ip.setHealthy(false);
                        ip.setMockValid(false);

                        Service service = cluster.getService();
                        service.setLastModifiedMillis(System.currentTimeMillis());
                        addResult(new HealthCheckResult(service.getName(),ip));

                        pushService.serviceChanged(service);

                        Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-DISABLED} invalid: {}:{}@{},msg);
                    } else {
                        Loggers.EVT_LOG.info("serviceName: {} {PROBE} {IP-DISABLED} invalid: {}:{}@{},msg);
                    }

                } else {
                    Loggers.EVT_LOG.info("serviceName: {} {OTHER} {IP-DISABLED} pre-invalid: {}:{}@{} in {},ip.getFailCount(),msg);
                }
            }
        } catch (Throwable t) {
            Loggers.SRV_LOG.error("[CHECK-FAIL] error when close check task.",t);
        }

        ip.getOKCount().set(0);

        ip.setBeingChecked(false);
    }
複製程式碼
  • checkFail對於healthy或者mockValid的instance會設定其healthy及mockValid為false,然後通過pushService.serviceChanged釋出變更事件,並新增HealthCheckResult到healthCheckResults中

checkFailNow

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckCommon.java

    public void checkFailNow(Instance ip,String msg) {
        Cluster cluster = task.getCluster();
        try {
            if (ip.isHealthy() || ip.isMockValid()) {
                if (distroMapper.responsible(cluster,ip)) {
                    ip.setHealthy(false);
                    ip.setMockValid(false);

                    Service service = cluster.getService();
                    service.setLastModifiedMillis(System.currentTimeMillis());

                    pushService.serviceChanged(service);
                    addResult(new HealthCheckResult(service.getName(),ip));

                    Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-DISABLED} invalid-now: {}:{}@{},msg);
                } else {
                    if (ip.isMockValid()) {
                        ip.setMockValid(false);
                        Loggers.EVT_LOG.info("serviceName: {} {PROBE} {IP-DISABLED} invalid-now: {}:{}@{},msg);
                    }

                }
            }
        } catch (Throwable t) {
            Loggers.SRV_LOG.error("[CHECK-FAIL-NOW] error when close check task.",t);
        }

        ip.getOKCount().set(0);
        ip.setBeingChecked(false);
    }
複製程式碼
  • checkFailNow對於healthy或者mockValid的instance會設定其healthy及mockValid為false,然後通過pushService.serviceChanged釋出變更事件,並新增HealthCheckResult到healthCheckResults中;與checkFail不同的是它對於非自己負責的instance會立馬標記mockVlid為false

小結

HealthCheckCommon的init方法註冊了一個延時任務,往其他server同步healthCheckResults;它主要提供了reEvaluateCheckRT、checkOK、checkFail、checkFailNow方法

doc