1. 程式人生 > >Nacos服務心跳和健康檢查原始碼介紹

Nacos服務心跳和健康檢查原始碼介紹

服務心跳

Nacos Client會維護一個定時任務通過持續呼叫服務端的介面更新心跳時間,保證自己處於存活狀態,防止服務端將服務剔除,Nacos預設5秒向服務端傳送一次,通過請求服務端介面/instance/beat傳送心跳。
客戶端服務在註冊服務的時候會增加一個心跳的任務,如下圖所示:


首先看下BeatInfo這個類,重點看標註的欄位,該欄位是給週期任務設定時間,如下圖:

該方法內部定義的一個DEFAULT_HEART_BEAT_INTERVAL的常量,設定5秒:

接下來我們看下addBeatInfo方法,該方法內部主要是將BeatTask任務加入到執行緒池ScheduledExecutorService當中,如下圖:

重點部分就是看BeatTask,BeatTask繼承Runnable,run方法就是我們的重點,該方法呼叫了NamingProxy的sendBeat方法,服務端請求地址為/instance/beat的方法


接下來我們把目光放到服務端,找到InstanceController的beat方法,如果是引數beat資訊的話,說明是第一次發起心跳,則會帶有服務例項資訊,因為發起心跳成功則服務端會返回下次不要帶beat資訊的引數,這樣客戶端第二次就不會攜帶beat資訊了。如果發現沒有該服務,又沒帶beat資訊,說明這個服務可能被移除過了,直接返回沒找到。如果沒有服務,但是發現有beat資訊,那就從beat中獲取服務例項資訊,進行註冊,整體執行流程如下圖:
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {

    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    //設定心跳間隔
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());

    String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    RsInfo clientBeat = null;
    //判斷有無心跳內容
    //如果存在心跳內容則不是輕量級心跳就轉化為RsInfo
    if (StringUtils.isNotBlank(beat)) {
        clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    }
    String clusterName = WebUtils
            .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
    int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
    if (clientBeat != null) {
        if (StringUtils.isNotBlank(clientBeat.getCluster())) {
            clusterName = clientBeat.getCluster();
        } else {
            // fix #2533
            clientBeat.setCluster(clusterName);
        }
        ip = clientBeat.getIp();
        port = clientBeat.getPort();
    }
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
    //獲取例項的資訊
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    //如果例項不存在
    if (instance == null) {
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }

        Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
        //根據您心跳內容建立一個例項資訊
        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setMetadata(clientBeat.getMetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());
        //註冊例項
        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }
    //獲取服務的資訊
    Service service = serviceManager.getService(namespaceId, serviceName);

    if (service == null) {
        throw new NacosException(NacosException.SERVER_ERROR,
                "service not found: " + serviceName + "@" + namespaceId);
    }
    //不存在的話,要建立一個進行處理
    if (clientBeat == null) {
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
        clientBeat.setPort(port);
        clientBeat.setCluster(clusterName);
    }
    //開啟心跳檢查任務
    service.processClientBeat(clientBeat);

    result.put(CommonParams.CODE, NamingResponseCode.OK);
    //5秒間隔
    if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    }
    //告訴客戶端不需要帶上心跳資訊了,變成輕量級心跳了
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    return result;
}
View Code

接下來我們看一下processClientBeat方法,該方法將ClientBeatProcessor放入到執行緒池中,接下來我們看下重點看下run方法,


該方法內部主要就是更新對應例項下心跳時間,整體上如下圖:

至此完成了從客戶端到服務端更新例項的心跳時間,下圖是整體的時序圖:

服務的健康檢查

Nacos Server會開啟一個定時任務來檢查註冊服務的健康情況,對於超過15秒沒收到客戶端的心跳例項會將它的 healthy屬性置為false,此時當客戶端不會將該例項的資訊發現,如果某個服務的例項超過30秒沒收到心跳,則剔除該例項,如果剔除的例項恢復,傳送心跳則會恢復。
當有例項註冊的時候,我們會看到有個service.init()的方法,該方法的實現主要是將ClientBeatCheckTask加入到執行緒池當中,如下圖:


ClientBeatCheckTask中的run方法主要做兩件事心跳時間超過15秒則設定該例項資訊為不健康狀況和心跳時間超過30秒則刪除該例項資訊,如下程式碼:
public void run() {
    try {
        if (!getDistroMapper().responsible(service.getName())) {
            return;
        }

        if (!getSwitchDomain().isHealthCheckEnabled()) {
            return;
        }
        //獲取服務所有例項資訊
        List<Instance> instances = service.allIPs(true);

        // first set health status of instances:
        for (Instance instance : instances) {
            //如果心跳時間超過15秒則設定該例項資訊為不健康狀況
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                if (!instance.isMarked()) {
                    if (instance.isHealthy()) {
                        instance.setHealthy(false);
                        Loggers.EVT_LOG
                                .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                        instance.getIp(), instance.getPort(), instance.getClusterName(),
                                        service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                        instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                        getPushService().serviceChanged(service);
                        ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                    }
                }
            }
        }

        if (!getGlobalConfig().isExpireInstance()) {
            return;
        }

        // then remove obsolete instances:
        for (Instance instance : instances) {

            if (instance.isMarked()) {
                continue;
            }
            //如果心跳時間超過30秒則刪除該例項資訊
            if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                // delete instance
                Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                        JacksonUtils.toJson(instance));
                deleteIp(instance);
            }
        }

    } catch (Exception e) {
        Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
    }

}
View Code

首先我們來看一下deleteIp方法,該方法內部主要通過構建刪除請求,傳送刪除請求,如下圖:


刪除例項的介面如下圖:

內部通過呼叫ServiceManager的removeInstance方法,如下圖:

重點看下substractIpAddresses內部通過呼叫updateIpAddresses,該方法內部主要就是移除到超過30秒的例項資訊,如下圖:

到此完成刪除例項的過程,整體的時序圖如下:

接下來我們看標記不健康時候的程式碼,這部分程式碼在客戶端註冊的時候也出現相同的程式碼,只是我們略過了,這部分也是觀察者模式的重要體現,從這裡我們可以學習到的東西在於結合Spring的事件機制,輕鬆實現觀察者模式,當然這個裡面也有部分我感覺寫的不太好,哈哈,大佬們看到勿噴。

首先我們看serviceChanged方法,該方法主要是釋出一個服務不健康的事件,如下圖:

接下來我們看下如何處理這個事件,這個時候涉及PushService這個類,整體的繼承結構如下圖:

我們看到該類的繼承ApplicationListener介面,該介面是一個支援泛型的介面,傳入了ServiceChangeEvent的類,此處就是對事件的處理,如下圖:

接下來看一下onApplicationEvent方法,這個方法主要完成了準備資料,傳送資料這幾件事情:
public void onApplicationEvent(ServiceChangeEvent event) {
    Service service = event.getService();
    String serviceName = service.getName();
    String namespaceId = service.getNamespaceId();

    Future future = GlobalExecutor.scheduleUdpSender(() -> {
        try {
            Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
            //獲取所有需要推送的客戶端
            ConcurrentMap<String, PushClient> clients = clientMap
                    .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            if (MapUtils.isEmpty(clients)) {
                return;
            }

            Map<String, Object> cache = new HashMap<>(16);
            long lastRefTime = System.nanoTime();
            for (PushClient client : clients.values()) {
                //超時的不刪除跳過處理
                if (client.zombie()) {
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    clients.remove(client.toString());
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    continue;
                }

                Receiver.AckEntry ackEntry;
                Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                byte[] compressData = null;
                Map<String, Object> data = null;

                if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                    org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                    compressData = (byte[]) (pair.getValue0());
                    data = (Map<String, Object>) pair.getValue1();
                    Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                }
                //準備UDP資料
                if (compressData != null) {
                    ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                } else {
                    ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                    if (ackEntry != null) {
                        cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                    }
                }

                Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                        client.getServiceName(), client.getAddrStr(), client.getAgent(),
                        (ackEntry == null ? null : ackEntry.key));
                //傳送資料
                udpPush(ackEntry);
            }
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

        } finally {
            //傳送完成刪除
            futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
        }

    }, 1000, TimeUnit.MILLISECONDS);
    //增加待推送的任務
    futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

}
View Code

接下里我們重點看下udpPush的方法,整個方法主要是通過一個Map物件來記錄UDP請求,如果沒收到就重試傳送請求,整體如下:

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
    if (ackEntry == null) {
        Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
        return null;
    }

    //如果大於最大的嘗試次數
    //移除傳送的資料和待確認的key
    //失敗推送的次數+1
    if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
        Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        return ackEntry;
    }

    try {
        if (!ackMap.containsKey(ackEntry.key)) {
            totalPush++;
        }
        //記錄UDP請求的返回資訊
        ackMap.put(ackEntry.key, ackEntry);
        udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());

        Loggers.PUSH.info("send udp packet: " + ackEntry.key);
        //傳送UDP請求
        udpSocket.send(ackEntry.origin);

        ackEntry.increaseRetryTime();
        //如果UDP沒收到返回資訊 每10秒嘗試一下
        GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
                TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);

        return ackEntry;
    } catch (Exception e) {
        Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
                ackEntry.origin.getAddress().getHostAddress(), e);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;

        return null;
    }
}
View Code

服務端有傳送,那麼客戶端就有接收的,接收部分我理解上是服務發現部分,這裡我們就不做過多介紹,待下一篇再來聊聊。

結束

歡迎大家點點關注,點點贊,感謝!