1. 程式人生 > 程式設計 >聊聊nacos Service的processClientBeat

聊聊nacos Service的processClientBeat

本文主要研究一下nacos Service的processClientBeat

Service.processClientBeat

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java

public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record,RecordListener<Instances> {

    private static final String SERVICE_NAME_SYNTAX = "[0-9a-zA-Z@\\.:_-]+"
; @JSONField(serialize = false) private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this); private String token; private List<String> owners = new ArrayList<>(); private Boolean resetWeight = false; private Boolean enabled = true; private Selector selector = new NoneSelector(); private String namespaceId; /** * IP will be deleted if
it has not send beat for some time,default timeout is 30 seconds. */ private long ipDeleteTimeout = 30 * 1000; private volatile long lastModifiedMillis = 0L; private volatile String checksum; /** * TODO set customized push expire time: */ private long pushCacheMillis = 0L; private Map<String,Cluster> clusterMap = new HashMap<>(); //...... public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); HealthCheckReactor.scheduleNow(clientBeatProcessor); } //...... } 複製程式碼
  • Service的processClientBeat方法會建立ClientBeatProcessor,並使用HealthCheckReactor進行排程

ClientBeatProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatProcessor.java

public class ClientBeatProcessor implements Runnable {
    public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
    private RsInfo rsInfo;
    private Service service;

    @JSONField(serialize = false)
    public PushService getPushService() {
        return SpringContext.getAppContext().getBean(PushService.class);
    }

    public RsInfo getRsInfo() {
        return rsInfo;
    }

    public void setRsInfo(RsInfo rsInfo) {
        this.rsInfo = rsInfo;
    }

    public Service getService() {
        return service;
    }

    public void setService(Service service) {
        this.service = service;
    }

    @Override
    public void run() {
        Service service = this.service;
        if (Loggers.EVT_LOG.isDebugEnabled()) {
            Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}",rsInfo.toString());
        }

        String ip = rsInfo.getIp();
        String clusterName = rsInfo.getCluster();
        int port = rsInfo.getPort();
        Cluster cluster = service.getClusterMap().get(clusterName);
        List<Instance> instances = cluster.allIPs(true);

        for (Instance instance : instances) {
            if (instance.getIp().equals(ip) && instance.getPort() == port) {
                if (Loggers.EVT_LOG.isDebugEnabled()) {
                    Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}",rsInfo.toString());
                }
                instance.setLastBeat(System.currentTimeMillis());
                if (!instance.isMarked()) {
                    if (!instance.isHealthy()) {
                        instance.setHealthy(true);
                        Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{},region: {},msg: client beat ok",cluster.getService().getName(),ip,port,cluster.getName(),UtilsAndCommons.LOCALHOST_SITE);
                        getPushService().serviceChanged(service);
                    }
                }
            }
        }
    }
}
複製程式碼
  • ClientBeatProcessor實現了Runnable方法,它會遍歷instances更新指定ip及port的instance的lastBeat時間;同時對於非marked且healthy為false的instance更新其healthy為true並通過getPushService().serviceChanged釋出變更事件

HealthCheckReactor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckReactor.java

public class HealthCheckReactor {

    private static final ScheduledExecutorService EXECUTOR;

    private static Map<String,ScheduledFuture> futureMap = new ConcurrentHashMap<>();

    static {

        int processorCount = Runtime.getRuntime().availableProcessors();
        EXECUTOR
                = Executors
                .newScheduledThreadPool(processorCount <= 1 ? 1 : processorCount / 2,new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r);
                        thread.setDaemon(true);
                        thread.setName("com.alibaba.nacos.naming.health");
                        return thread;
                    }
                });
    }

    public static ScheduledFuture<?> scheduleCheck(HealthCheckTask task) {
        task.setStartTime(System.currentTimeMillis());
        return EXECUTOR.schedule(task,task.getCheckRTNormalized(),TimeUnit.MILLISECONDS);
    }

    public static void scheduleCheck(ClientBeatCheckTask task) {
        futureMap.putIfAbsent(task.taskKey(),EXECUTOR.scheduleWithFixedDelay(task,5000,TimeUnit.MILLISECONDS));
    }

    public static void cancelCheck(ClientBeatCheckTask task) {
        ScheduledFuture scheduledFuture = futureMap.get(task.taskKey());
        if (scheduledFuture == null) {
            return;
        }
        try {
            scheduledFuture.cancel(true);
        } catch (Exception e) {
            Loggers.EVT_LOG.error("[CANCEL-CHECK] cancel failed!",e);
        }
    }


    public static ScheduledFuture<?> scheduleNow(Runnable task) {
        return EXECUTOR.schedule(task,TimeUnit.MILLISECONDS);
    }
}
複製程式碼
  • HealthCheckReactor在static程式碼塊建立了EXECUTOR,它提供了HealthCheckTask、ClientBeatCheckTask的schedule方法以及ClientBeatCheckTask的cancel方法,並提供了Runnable的scheduleNow方法

小結

  • Service的processClientBeat方法會建立ClientBeatProcessor,並使用HealthCheckReactor進行排程
  • ClientBeatProcessor實現了Runnable方法,它會遍歷instances更新指定ip及port的instance的lastBeat時間;同時對於非marked且healthy為false的instance更新其healthy為true並通過getPushService().serviceChanged釋出變更事件
  • HealthCheckReactor在static程式碼塊建立了EXECUTOR,它提供了HealthCheckTask、ClientBeatCheckTask的schedule方法以及ClientBeatCheckTask的cancel方法,並提供了Runnable的scheduleNow方法

doc