Nacos Naming原始碼分析(四)- 活躍檢測
阿新 • • 發佈:2019-08-08
臨時服務例項在註冊到naming server上之後,會週期性得傳送心跳資訊來保持節點的活躍。同時,naming server會週期性檢測每個例項最後一次收到心跳資訊的時間戳,摘除超時的節點並通知所有訂閱的客戶端。
例項活躍性檢測的定時任務封裝在Service類中,在init方法裡啟動:
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> { private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this); public void init() { // 啟動檢測任務 HealthCheckReactor.scheduleCheck(clientBeatCheckTask); ... } }
ClientBeatCheckTask會獲取當前服務所有的臨時節點並一一檢測該節點是否超時:
public class ClientBeatCheckTask implements Runnable { public void run() { try { // 當前服務不由本節點操作,則跳過 if (!getDistroMapper().responsible(service.getName())) { return; } // 所有臨時節點 List<Instance> instances = service.allIPs(true); // first set health status of instances: for (Instance instance : instances) { if (System.currentTimeMillis() - instance.getLastBeat() > ClientBeatProcessor.CLIENT_BEAT_TIMEOUT) { if (!instance.isMarked()) { if (instance.isHealthy()) { // 設定為下線 instance.setHealthy(false); // 通知訂閱客戶端 getPushService().serviceChanged(service.getNamespaceId(), service.getName()); } } } } // .... // then remove obsolete instances: for (Instance instance : instances) { if (System.currentTimeMillis() - instance.getLastBeat() > service.getIpDeleteTimeout()) { // delete instance deleteIP(instance); } } } catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); } } }
client通過呼叫/beat這一api來發送心跳資訊,該請求在InstanceController.beat(..)方法中被處理,後續呼叫Service.processClientBeat(..)方法:
// Service類 public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); HealthCheckReactor.scheduleNow(clientBeatProcessor); }
ClientBeatProcessor會更新instance的lastBeat資訊:
// ClientBeatProcessor
public void run() {
Service service = this.service;
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {
// 更新心跳時間
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
if (!instance.isHealthy()) {
instance.setHealthy(true);
getPushService().serviceChanged(service.getNamespaceId(), this.service.getName());
}
}
}
}