Eureka系列(八)服務剔除具體實現
服務下線的大致流程圖
下面這張圖很簡單地描述了服務剔除的大致流程:
服務剔除實現原始碼分析
首先我們得了解下服務剔除這個定時任務是什麼被初始化啟動的,在百度搜索中,在我們Eureka Server端啟用的時執行的EurekaBootStrap類中initEurekaServerContext方法找到了服務剔除任務的初始化。接下來我們就看一看原始碼:
protected void initEurekaServerContext() throws Exception { ...省略其他程式碼 registry.openForTraffic(applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); }
在initEurekaServerContext()方法中, registry.openForTraffic(applicationInfoManager, registryCount)這個方法來初始化我們的服務剔除任務。我們看原始碼驗證下:
@Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { super.openForTraffic(applicationInfoManager, count == 0 ? this.defaultOpenForTrafficCount : count); }
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. this.expectedNumberOfRenewsPerMin = count * 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); logger.info("Got {} instances from neighboring DS node", count); logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); super.postInit(); }
在openForTraffic方法中最後我們看到呼叫了父類postInit()方法,我們接著看postInit這個方法:
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
// 開啟定時任務,預設60秒執行一次,用於清理60秒之內沒有續約的例項
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
由上面可見,Eureka通過evictionTimer.schedule初始化了一個定時60s的定時任務。
接下來我們來看看EvictionTask這個類的具體實現EvictionTask這個類實現了服務剔除的具體操作。
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
我們接著看evict()方法的實現:
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
// 先收集過期的例項資訊,然後再剔除掉
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
// 為了補償GC暫停或本地時間漂移,我們需要使用當前登錄檔大小作為觸發自我保護的基礎。沒有它,我們就會把整個登錄檔都抹掉。
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}
由此可見,evict()方法最終實現了服務的剔除。
\(\color{red}{注意:}\)
\(\color{red}{Eureka的服務剔除會因為Eureka的自我保護機制而受到影響,導致不會剔除掉已經認為下線的服務}\),這一點,會在下一節中做下解Eureka自我保護機制的講解。
不知道有沒有小夥伴對Eureka是如何判斷這個例項是否不可用呢,有很大的疑惑呢?我們接下來去看一看lease.isExpired(additionalLeaseMs)這個方法,這個方法就是拿來判斷例項是否可用。
/**
* Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not.
*
* Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than
* what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect
* instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will
* not be fixed.
*
* @param additionalLeaseMs any additional lease time to add to the lease evaluation in ms.
*/
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
右上可見,我們可以發現Eureka是通過lastUpdateTimestamp這個上次更新時間來判斷我們的服務是否可用,不知道小夥伴對服務續約哪裡有影響,每當我們Client呼叫一次Server端服務續約介面時,Server端就會更新下服務的lastUpdateTimestamp。我們來回一下服務續約更新上次更新時間的方法,更新lastUpdateTimestamp程式碼如下:
/**
* Renew the lease, use renewal duration if it was specified by the
* associated {@link T} during registration, otherwise default duration is
* {@link #DEFAULT_DURATION_IN_SECS}.
*/
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
不知道小夥伴有沒有注意一個事情,在isExpired這個方法的註釋裡,好像有一個很大的“彩蛋”,註釋如下:Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will not be fixed. 翻譯過來就是:注意,由於renew()做了“錯誤”的事情,並將lastUpdateTimestamp設定為+duration,超過了它應該的值,因此到期實際上是2 * duration。這是一個小錯誤,應該隻影響那些不正常關閉的例項。由於可能對現有的使用產生廣泛的影響,這個問題將不會得到解決。
簡單來說,就是在服務續約執行renew()方法時,不應該加上duration這個值,但是呢,因為這個問題只會出現在檢測不正常關閉的服務才會有影響,Eureka 官方怕其他正在執行的服務有影響,就沒有修正這個小error。
看到這兒,小夥伴是不是覺得,eureka的RD也是很神奇,明明知道這是一個bug,但是卻不改(其實人家也想改,但是怕一改影響了其他的正常使用,然後考慮這個bug對Eureka正常使用沒有太大影響,也就沒有去修正了,但是人家RD還是很貼心的,在註釋中還是說明這個問題,以及為什麼不修正的原因)。
題外
可能有小夥伴會問,我們有服務下線介面,為什麼還需要EurekaServer服務端自己啟用一個服務剔除任務呢?
其實很簡單,因為如果我們是直接強制性停止任務,例如機器停電之類的,肯定Client就不會去呼叫服務下線介面,來通知Server端自己下線。其次如果我們Client正常停止,在呼叫服務下線介面中,發現網路出現問題,沒法呼叫Server提供的介面,那樣也沒法讓Server知道自己這個服務下線了。所以Server端需要自己啟動一個服務剔除任務,來剔除掉哪些已經down掉的服務。(該觀點為博主自己的主觀觀點,小夥伴也可以自行思考)