1. 程式人生 > 實用技巧 >Eureka系列(八)服務剔除具體實現

Eureka系列(八)服務剔除具體實現

服務下線的大致流程圖

  下面這張圖很簡單地描述了服務剔除的大致流程:

服務剔除實現原始碼分析

  首先我們得了解下服務剔除這個定時任務是什麼被初始化啟動的,在百度搜索中,在我們Eureka Server端啟用的時執行的EurekaBootStrap類中initEurekaServerContext方法找到了服務剔除任務的初始化。接下來我們就看一看原始碼:

protected void initEurekaServerContext() throws Exception {
        ...省略其他程式碼
        registry.openForTraffic(applicationInfoManager, registryCount);
        // Register all monitoring statistics.
        EurekaMonitors.registerAllStats();
    }

  在initEurekaServerContext()方法中, registry.openForTraffic(applicationInfoManager, registryCount)這個方法來初始化我們的服務剔除任務。我們看原始碼驗證下:

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
   super.openForTraffic(applicationInfoManager,
         count == 0 ? this.defaultOpenForTrafficCount : count);
}
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    this.expectedNumberOfRenewsPerMin = count * 2;
    this.numberOfRenewsPerMinThreshold =
            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    super.postInit();
}

  在openForTraffic方法中最後我們看到呼叫了父類postInit()方法,我們接著看postInit這個方法:

protected void postInit() {
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
    // 開啟定時任務,預設60秒執行一次,用於清理60秒之內沒有續約的例項
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}

  由上面可見,Eureka通過evictionTimer.schedule初始化了一個定時60s的定時任務。
  接下來我們來看看EvictionTask這個類的具體實現EvictionTask這個類實現了服務剔除的具體操作。

@Override
public void run() {
    try {
        long compensationTimeMs = getCompensationTimeMs();
        logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
        evict(compensationTimeMs);
    } catch (Throwable e) {
        logger.error("Could not run the evict task", e);
    }
}

  我們接著看evict()方法的實現:

public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");
    if (!isLeaseExpirationEnabled()) {
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }
    // We collect first all expired items, to evict them in random order. For large eviction sets,
    // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
    // the impact should be evenly distributed across all applications.
    // 先收集過期的例項資訊,然後再剔除掉
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }
    // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
    // triggering self-preservation. Without that we would wipe out full registry.
    // 為了補償GC暫停或本地時間漂移,我們需要使用當前登錄檔大小作為觸發自我保護的基礎。沒有它,我們就會把整個登錄檔都抹掉。
    int registrySize = (int) getLocalRegistrySize();
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;

    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            // Pick a random item (Knuth shuffle algorithm)
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);

            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            internalCancel(appName, id, false);
        }
    }
}

  由此可見,evict()方法最終實現了服務的剔除。

\(\color{red}{注意:}\)
  \(\color{red}{Eureka的服務剔除會因為Eureka的自我保護機制而受到影響,導致不會剔除掉已經認為下線的服務}\),這一點,會在下一節中做下解Eureka自我保護機制的講解。

   不知道有沒有小夥伴對Eureka是如何判斷這個例項是否不可用呢,有很大的疑惑呢?我們接下來去看一看lease.isExpired(additionalLeaseMs)這個方法,這個方法就是拿來判斷例項是否可用。


    /**
     * Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not.
     *
     * Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than
     * what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect
     * instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will
     * not be fixed.
     *
     * @param additionalLeaseMs any additional lease time to add to the lease evaluation in ms.
     */
    public boolean isExpired(long additionalLeaseMs) {
        return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }

  右上可見,我們可以發現Eureka是通過lastUpdateTimestamp這個上次更新時間來判斷我們的服務是否可用,不知道小夥伴對服務續約哪裡有影響,每當我們Client呼叫一次Server端服務續約介面時,Server端就會更新下服務的lastUpdateTimestamp。我們來回一下服務續約更新上次更新時間的方法,更新lastUpdateTimestamp程式碼如下:

   /**
    * Renew the lease, use renewal duration if it was specified by the
    * associated {@link T} during registration, otherwise default duration is
    * {@link #DEFAULT_DURATION_IN_SECS}.
    */
   public void renew() {
       lastUpdateTimestamp = System.currentTimeMillis() + duration;

   }

   不知道小夥伴有沒有注意一個事情,在isExpired這個方法的註釋裡,好像有一個很大的“彩蛋”,註釋如下:Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will not be fixed. 翻譯過來就是:注意,由於renew()做了“錯誤”的事情,並將lastUpdateTimestamp設定為+duration,超過了它應該的值,因此到期實際上是2 * duration。這是一個小錯誤,應該隻影響那些不正常關閉的例項。由於可能對現有的使用產生廣泛的影響,這個問題將不會得到解決。
   簡單來說,就是在服務續約執行renew()方法時,不應該加上duration這個值,但是呢,因為這個問題只會出現在檢測不正常關閉的服務才會有影響,Eureka 官方怕其他正在執行的服務有影響,就沒有修正這個小error。
  看到這兒,小夥伴是不是覺得,eureka的RD也是很神奇,明明知道這是一個bug,但是卻不改(其實人家也想改,但是怕一改影響了其他的正常使用,然後考慮這個bug對Eureka正常使用沒有太大影響,也就沒有去修正了,但是人家RD還是很貼心的,在註釋中還是說明這個問題,以及為什麼不修正的原因)。


題外

  可能有小夥伴會問,我們有服務下線介面,為什麼還需要EurekaServer服務端自己啟用一個服務剔除任務呢?
  其實很簡單,因為如果我們是直接強制性停止任務,例如機器停電之類的,肯定Client就不會去呼叫服務下線介面,來通知Server端自己下線。其次如果我們Client正常停止,在呼叫服務下線介面中,發現網路出現問題,沒法呼叫Server提供的介面,那樣也沒法讓Server知道自己這個服務下線了。所以Server端需要自己啟動一個服務剔除任務,來剔除掉哪些已經down掉的服務。(該觀點為博主自己的主觀觀點,小夥伴也可以自行思考