Spring Cloud原始碼分析之Eureka篇第五章:更新服務列表
在上一章《Spring Cloud原始碼分析之Eureka篇第四章:服務註冊是如何發起的 》,我們知道了作為Eureka Client的應用啟動時,在com.netflix.discovery.DiscoveryClient類的initScheduledTasks方法中,會做以下幾件事:
- 週期性更新服務列表;
- 週期性服務續約;
- 服務註冊邏輯;
概覽
以下圖片來自Netflix官方,圖中顯示Eureka Client會向註冊中心發起Get Registry請求來獲取服務列表,接下來就去看下對應的程式碼實現;
結論提前知曉
看原始碼易犯困,又難保持注意力集中,因此先拋結論吧,這樣不看原始碼也有收穫:
- Eureka client從註冊中心更新服務列表,然後自身會做快取;
- 作為服務消費者,就是從這些快取資訊中獲取的服務提供者的資訊;
- 增量更新的服務以30秒為週期迴圈呼叫;
- 增量更新資料在服務端儲存時間為3分鐘,因此Eureka client取得的資料雖然被稱為"增量更新",仍然可能和30秒前取的資料一樣,所以Eureka client要自己來處理重複資訊;
- 由3、4兩點可以推斷出,Eureka client的增量更新,其實獲取的是Eureka server最近三分鐘內的變更,因此,如果Eureka client有超過三分鐘沒有做增量更新的話(例如網路問題),那麼再呼叫增量更新介面時,那三分鐘內Eureka server的變更就可能獲取不到了,這就造成了Eureka server和Eureka client之間的資料不一致,需要有個方案來及時發現這個問題;
- 正常情況下,Eureka client多次增量更新後,最終的服務列表資料應該Eureka server保持一致,但如果期間發生異常,可能導致和Eureka server的資料不一致,為了暴露這個問題,Eureka server每次返回的增量更新資料中,會帶有一致性雜湊碼,Eureka client用本地服務列表資料算出的一致性雜湊碼應該和Eureka server返回的一致,若不一致就證明增量更新出了問題導致Eureka client和Eureka server上的服務列表資訊不一致了,此時需要全量更新;
- Eureka server上的服務列表資訊對外提供JSON/XML兩種格式下載;
- Eureka client使用jersey的SDK,去下載JSON格式的服務列表資訊;
關於原始碼版本
本次分析的Spring Cloud版本為Edgware.RELEASE,對應的eureka-client版本為1.7.0;
如何做到週期性執行
更新服務列表和服務續約都是週期性迴圈執行的,這是如何實現的呢,來看initScheduledTasks方法的原始碼:
如上圖兩個紅框中所示,scheduler.schedule方法其實啟動的是一個延時執行的一次性任務,不過TimedSupervisorTask內有乾坤,會在每次執行完任務後再啟動一個同樣的任務,這樣就能實現週期性執行任務了,並且TimedSupervisorTask的功能還不止如此,它還負責任務超時、動態調節週期性間隔、執行緒池滿、未知異常等各種情況的處理,推薦您參考《Eureka的TimedSupervisorTask類(自動調節間隔的週期性任務)》瞭解更多細節;
來自官方文件的指導資訊
學習原始碼之前先看文件可以確定大方向,不會因為陷入原始碼細節導致偏離學習目標,如下圖所示:
對上文,我的理解:
- Eureka client從註冊中心更新服務列表,然後自身會做快取;
- 作為服務消費者,就是從這些快取資訊中獲取的服務提供者的資訊;
- 增量更新的服務以30秒為週期迴圈呼叫;
- 增量更新資料在服務端儲存時間為3分鐘,因此Eureka client取得的資料雖然被稱為"增量更新",仍然可能和30秒前取的資料一樣,所以Eureka client要自己來處理重複資訊;
- 由3、4兩點可以推斷出,Eureka client的增量更新,其實獲取的是Eureka server最近三分鐘內的變更,因此,如果Eureka client有超過三分鐘沒有做增量更新的話(例如網路問題),那麼再呼叫增量更新介面時,那三分鐘內Eureka server的變更就可能獲取不到了,這就造成了Eureka server和Eureka client之間的資料不一致,需要有個方案來及時發現這個問題;
- 正常情況下,Eureka client多次增量更新後,最終的服務列表資料應該Eureka server保持一致,但如果期間發生異常,可能導致和Eureka server的資料不一致,為了暴露這個問題,Eureka server每次返回的增量更新資料中,會帶有一致性雜湊碼,Eureka client用本地服務列表資料算出的一致性雜湊碼應該和Eureka server返回的一致,若不一致就證明增量更新出了問題導致Eureka client和Eureka server上的服務列表資訊不一致了,此時需要全量更新;
- Eureka server上的服務列表資訊對外提供JSON/XML兩種格式下載;
- Eureka client使用jersey的SDK,去下載JSON格式的服務列表資訊;
準備工作就到此,接下來學習原始碼,整個過程應圍繞上述點八進行,不要過早陷入某些程式碼細節中;
原始碼分析
-
如下圖紅框所示,更新服務列表的邏輯已經封裝在CacheRefreshThread類中:
-
CacheRefreshThread類中又是呼叫refreshRegistry方法來實現服務列表更新的,refreshRegistry方法如下:
如上圖所示,本文假設應用部署在非AWS環境,所以Eureka client不做region和zone相關的配置,因此上圖綠框中的程式碼不會執行,我們聚焦紅框中的程式碼,先看fetchRegistry方法;
- fetchRegistry方法原始碼如下,請注意中文註釋:
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
//用Stopwatch做耗時分析
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 取出本地快取的,之氣獲取的服務列表資訊
Applications applications = getApplications();
//判斷多個條件,確定是否觸發全量更新,如下任一個滿足都會全量更新:
//1. 是否禁用增量更新;
//2. 是否對某個region特別關注;
//3. 外部呼叫時是否通過入參指定全量更新;
//4. 本地還未快取有效的服務列表資訊;
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
//這些詳細的日誌可以看出觸發全量更新的原因
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
//全量更新
getAndStoreFullRegistry();
} else {
//增量更新
getAndUpdateDelta(applications);
}
//重新計算和設定一致性hash碼
applications.setAppsHashCode(applications.getReconcileHashCode());
//日誌列印所有應用的所有例項數之和
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
//將本地快取更新的事件廣播給所有已註冊的監聽器,注意該方法已被CloudEurekaClient類重寫
onCacheRefreshed();
//檢查剛剛更新的快取中,有來自Eureka server的服務列表,其中包含了當前應用的狀態,
//當前例項的成員變數lastRemoteInstanceStatus,記錄的是最後一次更新的當前應用狀態,
//上述兩種狀態在updateInstanceRemoteStatus方法中作比較 ,如果不一致,就更新lastRemoteInstanceStatus,並且廣播對應的事件
updateInstanceRemoteStatus();
return true;
}
上述程式碼中已有註釋詳細說明,就不另外贅述了,接下來細看getAndStoreFullRegistry和getAndUpdateDelta這兩個方法,瞭解全量增量更新的細節;
全量更新本地快取的服務列表
- getAndStoreFullRegistry方法負責全量更新,程式碼如下所示,非常簡單的邏輯:
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
//由於並沒有配置特別關注的region資訊,因此會呼叫eurekaTransport.queryClient.getApplications方法從服務端獲取服務列表
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
//返回物件就是服務列表
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
}
//考慮到多執行緒同步,只有CAS成功的執行緒,才會把自己從Eureka server獲取的資料來替換本地快取
else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
//localRegionApps就是本地快取,是個AtomicReference例項
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
- getAndStoreFullRegistry方法中並無複雜邏輯,只有eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())這段需要展開細看,和Eureka server互動的邏輯都在這裡面,方法getApplications的具體實現是在EurekaHttpClientDecorator類:
@Override
public EurekaHttpResponse<Applications> getApplications(final String... regions) {
return execute(new RequestExecutor<Applications>() {
@Override
public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) {
return delegate.getApplications(regions);
}
@Override
public RequestType getRequestType() {
//本次向Eureka server請求的型別:獲取服務列表
return RequestType.GetApplications;
}
});
}
EurekaHttpClientDecorator類從名字看是個裝飾者模式的實現,看它的其他程式碼,發現各類遠端服務都在此被封裝成API了,例如註冊的:
@Override
public EurekaHttpResponse<Void> register(final InstanceInfo info) {
return execute(new RequestExecutor<Void>() {
@Override
public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {
return delegate.register(info);
}
@Override
public RequestType getRequestType() {
return RequestType.Register;
}
});
}
還有續租的:
@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(final String appName,
final String id,
final InstanceInfo info,
final InstanceStatus overriddenStatus) {
return execute(new RequestExecutor<InstanceInfo>() {
@Override
public EurekaHttpResponse<InstanceInfo> execute(EurekaHttpClient delegate) {
return delegate.sendHeartBeat(appName, id, info, overriddenStatus);
}
@Override
public RequestType getRequestType() {
return RequestType.SendHeartBeat;
}
});
}
- 再繼續追蹤 delegate.register(info),進入了AbstractJerseyEurekaHttpClient類,這裡面是各種網路請求的具體實現,EurekaHttpClientDecorator類中的getApplications、register、sendHeartBeat等方法對應的網路請求響應邏輯在AbstractJerseyEurekaHttpClient中都有具體實現,篇幅所限我們只關注getApplications:
@Override
public EurekaHttpResponse<Applications> getApplications(String... regions) {
//取全量資料的path是""apps"
return getApplicationsInternal("apps/", regions);
}
@Override
public EurekaHttpResponse<Applications> getDelta(String... regions) {
//取增量資料的path是""apps/delta"
return getApplicationsInternal("apps/delta", regions);
}
//具體的請求響應處理都在此方法中
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
ClientResponse response = null;
String regionsParamValue = null;
try {
//jersey、resource這些關鍵詞都預示著這是個restful請求
WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
if (regions != null && regions.length > 0) {
regionsParamValue = StringUtil.join(regions);
webResource = webResource.queryParam("regions", regionsParamValue);
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
//發起網路請求,將響應封裝成ClientResponse例項
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
Applications applications = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
//取得全部應用資訊
applications = response.getEntity(Applications.class);
}
return anEurekaHttpResponse(response.getStatus(), Applications.class)
.headers(headersOf(response))
.entity(applications)
.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
serviceUrl, urlPath,
regionsParamValue == null ? "" : "regions=" + regionsParamValue,
response == null ? "N/A" : response.getStatus()
);
}
if (response != null) {
response.close();
}
}
}
上述程式碼中,利用jersey-client庫的API向Eureka server發起restful請求,並將響應資料封裝到EurekaHttpResponse例項中返回;
小結:獲取全量資料,是通過jersey-client庫的API向Eureka server發起restful請求實現的,並將響應的服務列表資料放在一個成員變數中作為本地快取;
獲取服務列表資訊的增量更新
獲取服務列表資訊的增量更新是通過getAndUpdateDelta方法完成的,具體分析請看下面的中文註釋:
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
//增量資訊是通過eurekaTransport.queryClient.getDelta方法完成的
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
//delta中儲存了Eureka server返回的增量更新
delta = httpResponse.getEntity();
}
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
//如果增量資訊為空,就直接發起一次全量更新
getAndStoreFullRegistry();
}
//考慮到多執行緒同步問題,這裡通過CAS來確保請求發起到現在是執行緒安全的,
//如果這期間fetchRegistryGeneration變了,就表示其他執行緒也做了類似操作,因此放棄本次響應的資料
else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
//用Eureka返回的增量資料和本地資料做合併操作,這個方法稍後會細說
updateDelta(delta);
//用合併了增量資料之後的本地資料來生成一致性雜湊碼
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
//Eureka server在返回增量更新資料時,也會返回服務端的一致性雜湊碼,
//理論上每次本地快取資料經歷了多次增量更新後,計算出的一致性雜湊碼應該是和服務端一致的,
//如果發現不一致,就證明本地快取的服務列表資訊和Eureka server不一致了,需要做一次全量更新
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
//一致性雜湊碼不同,就在reconcileAndLogDifference方法中做全量更新
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
上述程式碼中有幾處需要注意:
a. 獲取增量更新資料使用的方法是:eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
b. 將增量更新的資料和本地快取合併的方法是: updateDelta(delta);
c. 通過檢查一致性雜湊碼可以確定歷經每一次增量更新後,本地的服務列表資訊和Eureka server上的是否還保持一致,若不一致就要做一次全量更新,通過呼叫reconcileAndLogDifference方法來完成;
上述a、b、c三點,接下來依次展開:
- 向Eureka server發起網路請求的邏輯和前面全量更新的差不多,也是EurekaHttpClientDecorator和AbstractJerseyEurekaHttpClient這兩個類合作實現的,先看EurekaHttpClientDecorator部分:
@Override
public EurekaHttpResponse<Applications> getDelta(final String... regions) {
return execute(new RequestExecutor<Applications>() {
@Override
public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) {
return delegate.getDelta(regions);
}
@Override
public RequestType getRequestType() {
return RequestType.GetDelta;
}
});
}
- 再看AbstractJerseyEurekaHttpClient類中的getDelta方法,居然和全量獲取服務列表資料呼叫了相同的方法getApplicationsInternal,只是ur引數不一樣而已;
@Override
public EurekaHttpResponse<Applications> getDelta(String... regions) {
return getApplicationsInternal("apps/delta", regions);
}
由上述程式碼可見,從Eureka server的獲取增量更新,和一些常見的方式略有區別:
a. 一般的增量更新是在請求中增加一個時間戳或者上次更新的tag號等引數,由服務端根據引數來判斷哪些資料是客戶端沒有的;
b. 而這裡的Eureka client卻沒有這類引數,聯想到前面官方文件中提到的“Eureka會把更新資料保留三分鐘”,就可以理解了:Eureka把最近的變更資料保留三分鐘,這三分鐘內每個Eureka client來請求增量更新是,server都返回同樣的快取資料,只要client能保證三分鐘之內有一次請求,就能保證自己的資料和Eureka server端的保持一致;
c. 那麼如果client有問題,導致超過三分鐘才來獲取增量更新資料,那就有可能client和server資料不一致了,此時就要有一種方式來判斷是否不一致,如果不一致,client就會做一次全量更新,這種判斷就是一致性雜湊碼;
- Eureka client獲取到增量更新後,通過updateDelta方法將增量更新資料和本地資料做合併:
private void updateDelta(Applications delta) {
int deltaCount = 0;
//遍歷所有服務
for (Application app : delta.get