Eureka Server和Eureka Client註冊探祕
原文作者:李剛
原文地址:Eureka中RetryableClientQuarantineRefreshPercentage引數探祕
前言
我們知道Eureka分為兩部分,Eureka Server和Eureka Client。Eureka Server充當註冊中心的角色,Eureka Client相對於Eureka Server來說是客戶端,需要將自身資訊註冊到註冊中心。本文主要介紹的就是在Eureka Client註冊到Eureka Server時RetryableClientQuarantineRefreshPercentage
引數的使用技巧。
Eureka Client註冊過程分析
Eureka Client註冊到Eureka Server時,首先遇到第一個問題就是Eureka Client端要知道Server的地址,這個引數對應的是eureka.client.service-url.defaultZone
,舉個例子,在Eureka Client的properties檔案中配置如下:
eureka.client.service-url.defaultZone= http://localhost:8761/eureka,http://localhost:8762/eureka,http://localhost:8763/eureka,http://localhost:8764/eureka
如上圖所示,Eureka Client配置對應的Eureka Server地址分別是8761、8762、8763、8764。這裡存在兩個問題:
a. Eureka Client會將自身資訊分別註冊到這四個地址嗎?
b. Eureka Clinent註冊機制是怎樣的?
原始碼面前一目瞭然,帶著這兩個問題我們通過原始碼來解答這兩個問題。Eureka Client在啟動的時候註冊原始碼如下:RetryableEurekaHttpClient
中的execut
方法
@Override protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) { List<EurekaEndpoint> candidateHosts = null; int endpointIdx = 0; for (int retry = 0; retry < numberOfRetries; retry++) { EurekaHttpClient currentHttpClient = delegate.get(); EurekaEndpoint currentEndpoint = null; if (currentHttpClient == null) { if (candidateHosts == null) { candidateHosts = getHostCandidates(); if (candidateHosts.isEmpty()) { throw new TransportException("There is no known eureka server; cluster server list is empty"); } } if (endpointIdx >= candidateHosts.size()) { throw new TransportException("Cannot execute request on any known server"); } currentEndpoint = candidateHosts.get(endpointIdx++); currentHttpClient = clientFactory.newClient(currentEndpoint); } try { EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient); if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) { delegate.set(currentHttpClient); if (retry > 0) { logger.info("Request execution succeeded on retry #{}", retry); } return response; } logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode()); } catch (Exception e) { logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace } // Connection error or 5xx from the server that must be retried on another server delegate.compareAndSet(currentHttpClient, null); if (currentEndpoint != null) { quarantineSet.add(currentEndpoint); } } throw new TransportException("Retry limit reached; giving up on completing the request"); }
按照我的理解,程式碼精簡後內容如下:
int endpointIdx = 0;
//用來儲存所有Eureka Server資訊(8761、8762、8763、8764)
List<EurekaEndpoint> candidateHosts = null;
//numberOfRetries的值程式碼寫死預設為3次
for (int retry = 0; retry < numberOfRetries; retry++) {
/**
*首次進入迴圈時,獲取全量的Eureka Server資訊(8761、8762、8763、8764)
*/
if (candidateHosts == null) {
candidateHosts = getHostCandidates();
}
/**
*通過endpointIdx自增,依次獲取Eureka Server資訊,然後傳送
*註冊的Post請求.
*/
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
try {
/**
*傳送註冊的Post請求動作,注意如果成功,則跳出迴圈,如果失敗則
*根據endpointIdx依次獲取下一個Eureka Server.
*/
response = requestExecutor.execute(currentHttpClient);
return respones;
} catch (Exception e) {
//向註冊中心(Eureka Server)發起註冊的post出現異常時,列印日誌...
}
//如果此次註冊動作失敗,將當前的資訊儲存到quarantineSet中(一個Set集合)
if (currentEndpoint != null) {
quarantineSet.add(currentEndpoint);
}
}
//如果都失敗,則以異常形式丟擲...
throw new TransportException("Retry limit reached; giving up on completing the request");
上面程式碼中還有一個方法很重要就是 List<EurekaEndpoint> candidateHosts = getHostCandidates();
接下來看下getHostCandidates()
方法原始碼
private List<EurekaEndpoint> getHostCandidates() {
List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();
quarantineSet.retainAll(candidateHosts);
// If enough hosts are bad, we have no choice but start over again
int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());
if (quarantineSet.isEmpty()) {
// no-op
} else if (quarantineSet.size() >= threshold) {
logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
quarantineSet.clear();
} else {
List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
for (EurekaEndpoint endpoint : candidateHosts) {
if (!quarantineSet.contains(endpoint)) {
remainingHosts.add(endpoint);
}
}
candidateHosts = remainingHosts;
}
return candidateHosts;
}
按照我的理解,將程式碼精簡下,只包括關鍵邏輯,內容如下:
private List<EurekaEndpoint> getHostCandidates() {
/**
* 獲取所有defaultZone配置的註冊中心資訊(Eureka Server),
* 在本文例子中代表4個(8761、8762、8763、8764)Eureka Server
*/
List candidateHosts = clusterResolver.getClusterEndpoints();
/**
* quarantineSet這個Set集合中儲存的是不可用的Eureka Server
* 此處是拿不可用的Eureka Server與全量的Eureka Server取交集
*/
quarantineSet.retainAll(candidateHosts);
/**
* 根據RetryableClientQuarantineRefreshPercentage引數計算閾值
* 該閾值後續會和quarantineSet中儲存的不可用的Eureka Server個數
* 作比較,從而判斷是否返回全量的Eureka Server還是過濾掉不可用的
* Eureka Server。
*/
int threshold =
(int) (
candidateHosts.size()
*
transportConfig.getRetryableClientQuarantineRefreshPercentage()
);
if (quarantineSet.isEmpty()) {
/**
* 首次進入的時候,此時quarantineSet為空,直接返回全量的
* Eureka Server列表
*/
} else if (quarantineSet.size() >= threshold) {
/**
* 將不可用的Eureka Server與threshold值相比較,如果不可
* 用的Eureka Server個數大於閾值,則將之前儲存的Eureka
* Server內容直接清空,並返回全量的Eureka Server列表。
*/
quarantineSet.clear();
} else {
/**
* 通過quarantineSet集合儲存不可用的Eureka Server來過濾
* 全量的EurekaServer,從而獲取此次Eureka Client要註冊要
* 註冊的Eureka Server例項地址。
*/
List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
for (EurekaEndpoint endpoint : candidateHosts) {
if (!quarantineSet.contains(endpoint)) {
remainingHosts.add(endpoint);
}
}
candidateHosts = remainingHosts;
}
return candidateHosts;
}
通過原始碼分析,我們現在初步知道,當Eureka Client向Eureka Server發起註冊請求的時候(根據defaultZone尋找Eureka Server列表),如果有一次請求註冊成功,那麼後續就不會在向其他Eureka Server發起註冊請求。以本文為例,註冊中心有四個(8761、8762、8763、8764)。如果8761對應的Eureka Server服務的狀態是UP,那麼Eureka Client向該註冊中心註冊成功後,不會再向(8762、8763、8764)對應的Eureka Server發起註冊請求(對應程式是在for迴圈中直接return respones)。
說到這裡又引出來另外一個問題,如果8761這個Eureka Server是down掉的呢?
根據原始碼我們可知Eureka Client首次會向8761這個Server發起註冊請求,如果該Server的狀態是down,那麼它會將該Server儲存到quarantineSet這個Set集合中,然後再次訪問8762這個Eureka Server,如果8762這個Server的狀態依舊是down,它也會把這個Server儲存到quarantineSet這個Set集合中,然後繼續訪問8763這個Server,如果8763這個Server的狀態依舊是down,此時除了會將其儲存到quarantineSet這個Set集合中之外,還會跳出本次迴圈。從而結束此次註冊過程。
說道這裡有人要問接下來會不會向8764這個Server發起註冊,答案是否定的,因為迴圈的次數預設是3次。所以即使8764這個Server的狀態是UP,它也不會接收到來自Eureka Client發起的註冊資訊。
Eureka Client向Eureka Server發起註冊資訊的過程除了在Eureka Client啟動的時候觸發,還有另外一種方式,就是後臺定時任務。
假設我們上面描述的場景是在Eureka Client啟動的時候,因為在啟動的時候註冊這個過程全部失敗了,當後臺定時任務執行時,還會進入該註冊流程。注意此時quarantineSet的值為3(8761、8762、8763之前註冊失敗的Eureka Server)。
所以當程式再次進入getHostCandidates()
方法時,if (quarantineSet.isEmpty())
這個方法是不滿足的,接下來會走else if (quarantineSet.size() >= threshold)
這個判斷,如果這個判斷成立,那麼會將quarantineSet集合清空,同時返回全量的Eureka Server列表,如果這個判斷不成立,會拿quarantineSet集合中儲存的內容去過濾Eureka Server的全量列表。以本文為例:quarantineSet
中儲存的是(8761、8762、8763)三個Eureka Server,Eureka Server全量列表的內容是(8761、8762、8763、8764)四個Eureka Server,過濾後返回的結果為8764這個Eureka Server。
在本文的例子中8761、8762、8763這三個Eureka Server的狀態是down而8764這個Eureka Server的狀態是UP,我們其實是想走到最後的else分支,從而完成過濾操作,並最終得到8764這個Server,遺憾的是它並不會走到這個分支,而是被上面的else if (quarantineSet.size() >= threshold)
這個分支所攔截,返回的依舊是全量的Eureka Server列表。這樣造成的後果就是Eureka Client依舊會依次向(8761、8762、8763)這三個down的Eureka Server發起註冊請求。
那麼問題的關鍵在哪裡呢?問題的關鍵就是threshold這個值的由來,因為此時quarantineSet.size()的值為3,而3這個值大於threshold,從而導致,會將quarantineSet集合清空,返回全量的Server列表。
我們知道threshold這個值是根據全量的Eureka Server列表乘以一個可配置的引數計算出來的,在本文的例子當中,我的properties檔案中除了defaultZone之外並沒有配置這個引數,那麼也就是說這個引數是有預設值的,通過原始碼我們瞭解到,這個預設值是0.66。具體原始碼如下:
final class PropertyBasedTransportConfigConstants {
/**
*省略部分原始碼
*/
static class Values {
static final int SESSION_RECONNECT_INTERVAL = 20*60;
//預設值為0.66
static final double QUARANTINE_REFRESH_PERCENTAGE = 0.66;
static final int DATA_STALENESS_TRHESHOLD = 5*60;
static final int ASYNC_RESOLVER_REFRESH_INTERVAL = 5*60*1000;
static final int ASYNC_RESOLVER_WARMUP_TIMEOUT = 5000;
static final int ASYNC_EXECUTOR_THREADPOOL_SIZE = 5;
}
}
/**
*@return the percentage of the full endpoints set above which the
*quarantine set is cleared in the range [0, 1.0]
*/
double getRetryableClientQuarantineRefreshPercentage();
看到這裡就不難理解了,因為這個值是0.66而此時全量的Eureka Server值為4。計算之後的值為2,而由於註冊的for迴圈為3次,所以當第二次發起註冊流程的時候quarantineSet的值始終大於threshold。這樣就會導致一個問題,就是如果8761、8762、8763一直是down即使8764一直是好的,那麼Eureka Client也不會註冊成功。而且這個引數值的區間為0到1.
既然通過原始碼分析我們找到了問題根源,其實對應的我們也找到了解決這個問題的辦法,就是對應把這個引數值調大些。
這個值在properties中對應的寫法如下:
eureka.client.transport.retryableClientQuarantineRefreshPercentage = xxx
接下來我們修改下properties檔案,修改後的內容如下:
eureka.client.service-url.defaultZone=
http://localhost:8761/eureka,http://localhost:8762/eureka,http://localhost:8763/eureka,http://localhost:8764/eureka
eureka.client.transport.retryableClientQuarantineRefreshPercentage=1
接下來按照這個配置再次回顧下上面的流程:
- Eureka Client啟動時進行註冊(8761、8762、8763的狀態是down),所以此時quarantineSet的值為3.
- 接下來在定時任務中又觸發註冊事件,此時因為引數的值從0.66調整為1。所以計算出的threshold的值為4。而此時quarantineSet的值為3。所以不會進入到
else if (quarantineSet.size() >= threshold)
分支,而是會進入最後的esle分支。 - 在else分支中會完成過濾功能,最終返回的list中的結果只有一個就是8764這個Eureka Server。
- Eureka Client向8764這個Eureka Server發起註冊請求,得到成功相應,並返回。
遺留問題
說道這裡我們感覺好像是解決了這個問題,那麼問一個問題,這個引數值可以設定的無限大嗎?
比如我將這個引數值設定為10,雖然javaDoc中說明這個引數值的範圍在0-1之間,但是並沒有說明如果將這個引數調整大於1會出現什麼情況。接下來按照上面的流程我們分析下:
之前我們分析的流程中的前提是8761、8762、8763這三臺Server的狀態是down而8764這個server的狀態是up,現在我們修改下這個前提。
假設一開始8761、8762、8763、8764這四臺Eureka Server的狀態都是down。Eureka Client啟動時進行註冊(8761、8762、8763的狀態是down),所以此時quarantineSet的值為3.
- 接下來在定時任務中又觸發註冊事件,此時因為引數的值從0.66調整為10。所以計算出的threshold的值為40。而此時quarantineSet的值為3。所以不會進入到
else if (quarantineSet.size() >= threshold)
分支,而是會進入最後的esle分支。 - 在else分支中會完成過濾功能,最終返回的list中的結果只有一個就是8764這個Eureka Server。
- Eureka Client向8764這個Eureka Server發起註冊請求,因為此時8764的狀態也是down導致註冊失敗,此時quarantineSet中的內容是(8761、8762、8763、8764)
- 當定時任務再次觸發時
if (quarantineSet.isEmpty())
這個分支不會進入,因為此時quarantineSet的值為4 else if (quarantineSet.size() >= threshold)
這分支也不會進入因為threshold的值為40- 最終會進入else分支,這個分支原本的含義是想通過quarantineSet來充當過濾器,從全量的Eureka Server中過濾掉之前狀態為down的Eureka Server,但是由於quarantineSet的值現在已經是全量,導致過濾後的結果返回的是一個空的list。即使此時Eureka Server列表(8761、8762、8763、8764)任何一個Server的狀態變為UP,該Eureka Client也不可能完成註冊事件。
解決辦法
上面出現的那個問題,根本原因個人認為是由於eureka.client.transport.retryableClientQuarantineRefreshPercentage
引數過大而原始碼中沒有校驗,從而導致沒有進入else if (quarantineSet.size() >= threshold)
的邏輯分支,因為此時如果quarantineSet中的值已經達到了所有Eureka Server列表,那麼此時我們希望的是將這個Set集合清空,從而再次返回全量的Eureka Server列表,也就是說再重新來一次註冊流程。
所以基於上面的分析,個人認為在原始碼的getHostCandidates
增加下校驗,具體程式碼如下:
private List<EurekaEndpoint> getHostCandidates() {
List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();
quarantineSet.retainAll(candidateHosts);
// If enough hosts are bad, we have no choice but start over again
int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());
/**
* 增加判斷如果threshold的值過大,即超過Eureka Server
* 列表的數量,那麼將其再次賦值,賦值的內容為Eureka Server
* 列表的數量。
*/
if (threshold > candidateHosts.size()) {
threshold = candidateHosts.size();
}
if (quarantineSet.isEmpty()) {
// no-op
} else if (quarantineSet.size() >= threshold) {
logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
quarantineSet.clear();
} else {
List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
for (EurekaEndpoint endpoint : candidateHosts) {
if (!quarantineSet.contains(endpoint)) {
remainingHosts.add(endpoint);
}
}
candidateHosts = remainingHosts;
}
return candidateHosts;
}