1. 程式人生 > >Nacos Naming原始碼分析(四)- 活躍檢測

Nacos Naming原始碼分析(四)- 活躍檢測

臨時服務例項在註冊到naming server上之後,會週期性得傳送心跳資訊來保持節點的活躍。同時,naming server會週期性檢測每個例項最後一次收到心跳資訊的時間戳,摘除超時的節點並通知所有訂閱的客戶端。

例項活躍性檢測的定時任務封裝在Service類中,在init方法裡啟動:

public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> { 
    private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this); 
    public void init() { 
        // 啟動檢測任務 
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask); 
        ... 
    } 
}

ClientBeatCheckTask會獲取當前服務所有的臨時節點並一一檢測該節點是否超時:

public class ClientBeatCheckTask implements Runnable {
    
    public void run() {
        try {
            // 當前服務不由本節點操作,則跳過
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }
    
            // 所有臨時節點
            List<Instance> instances = service.allIPs(true);
    
            // first set health status of instances:
            for (Instance instance : instances) {
                if (System.currentTimeMillis() - instance.getLastBeat() > ClientBeatProcessor.CLIENT_BEAT_TIMEOUT) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            // 設定為下線
                            instance.setHealthy(false);
                            // 通知訂閱客戶端
                            getPushService().serviceChanged(service.getNamespaceId(), service.getName());
                        }
                    }
                }
            }
    
            // ....
    
            // then remove obsolete instances:
            for (Instance instance : instances) {
                if (System.currentTimeMillis() - instance.getLastBeat() > service.getIpDeleteTimeout()) {
                    // delete instance
                    deleteIP(instance);
                }
            }
    
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }
    }
}

client通過呼叫/beat這一api來發送心跳資訊,該請求在InstanceController.beat(..)方法中被處理,後續呼叫Service.processClientBeat(..)方法:

// Service類
public void processClientBeat(final RsInfo rsInfo) {
    ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
    clientBeatProcessor.setService(this);
    clientBeatProcessor.setRsInfo(rsInfo);
    HealthCheckReactor.scheduleNow(clientBeatProcessor);
}

ClientBeatProcessor會更新instance的lastBeat資訊:

// ClientBeatProcessor
public void run() {
    Service service = this.service;    
    String ip = rsInfo.getIp();
    String clusterName = rsInfo.getCluster();
    int port = rsInfo.getPort();
    Cluster cluster = service.getClusterMap().get(clusterName);
    List<Instance> instances = cluster.allIPs(true);

    for (Instance instance : instances) {
        if (instance.getIp().equals(ip) && instance.getPort() == port) {
            // 更新心跳時間
            instance.setLastBeat(System.currentTimeMillis());
            if (!instance.isMarked()) {
                if (!instance.isHealthy()) {
                    instance.setHealthy(true);
                    getPushService().serviceChanged(service.getNamespaceId(), this.service.getName());
                }
            }
        }
    }