Eureka客戶端初始化(3)
6. 根據配置clientConfig.shouldRegisterWithEureka()生成eurekaTransport的註冊客戶端工廠newRegistrationClientFactory
static EurekaHttpClientFactory canonicalClientFactory(final String name, final EurekaTransportConfig transportConfig, final ClusterResolver<EurekaEndpoint> clusterResolver, final TransportClientFactory transportClientFactory) { return new EurekaHttpClientFactory() { @Override public EurekaHttpClient newClient() { return new SessionedEurekaHttpClient( name, RetryableEurekaHttpClient.createFactory( name, transportConfig, clusterResolver, RedirectingEurekaHttpClient.createFactory(transportClientFactory), ServerStatusEvaluators.legacyEvaluator()), transportConfig.getSessionedClientReconnectIntervalSeconds() * 1000 ); } @Override public void shutdown() { wrapClosable(clusterResolver).shutdown(); } }; }
建立註冊客戶端registrationClient,生成重定向http客戶端
public static TransportClientFactory createFactory(final TransportClientFactory delegateFactory) { final DnsServiceImpl dnsService = new DnsServiceImpl(); return new TransportClientFactory() { @Override public EurekaHttpClient newClient(EurekaEndpoint endpoint) { return new RedirectingEurekaHttpClient(endpoint.getServiceUrl(), delegateFactory, dnsService); } @Override public void shutdown() { delegateFactory.shutdown(); } }; }
服務端狀態值解析器ServerStatusEvaluators.legacyEvaluator()
private static final ServerStatusEvaluator LEGACY_EVALUATOR = new ServerStatusEvaluator() { @Override public boolean accept(int statusCode, RequestType requestType) { if (statusCode >= 200 && statusCode < 300 || statusCode == 302) { return true; } else if (requestType == RequestType.Register && statusCode == 404) { return true; } else if (requestType == RequestType.SendHeartBeat && statusCode == 404) { return true; } else if (requestType == RequestType.Cancel) { // cancel is best effort return true; } else if (requestType == RequestType.GetDelta && (statusCode == 403 || statusCode == 404)) { return true; } return false; } };
然後包裝成RetryableEurekaHttpClient
public static EurekaHttpClientFactory createFactory(final String name,
final EurekaTransportConfig transportConfig,
final ClusterResolver<EurekaEndpoint> clusterResolver,
final TransportClientFactory delegateFactory,
final ServerStatusEvaluator serverStatusEvaluator) {
return new EurekaHttpClientFactory() {
@Override
public EurekaHttpClient newClient() {
return new RetryableEurekaHttpClient(name, transportConfig, clusterResolver, delegateFactory,
serverStatusEvaluator, DEFAULT_NUMBER_OF_RETRIES);
}
@Override
public void shutdown() {
delegateFactory.shutdown();
}
};
}
最後包裝成SessionedEurekaHttpClient後返回
public SessionedEurekaHttpClient(String name, EurekaHttpClientFactory clientFactory, long sessionDurationMs) {
this.name = name;
this.clientFactory = clientFactory;
this.sessionDurationMs = sessionDurationMs;
this.currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
Monitors.registerObject(name, this);
}
7. 根據配置clientConfig.shouldFetchRegistry()生成eurekaTransport的查詢客戶端工廠queryClientFactory
newQueryClientFactory = EurekaHttpClients.queryClientFactory(
eurekaTransport.bootstrapResolver,
eurekaTransport.transportClientFactory,
clientConfig,
transportConfig,
applicationInfoManager.getInfo(),
applicationsSource
);
newQueryClient = newQueryClientFactory.newClient();
通上面建立註冊客戶端類似,只是傳遞的name不同,最後設定queryClient,scheduleServerEndpointTask方法執行完畢
canonicalClientFactory(EurekaClientNames.QUERY, transportConfig, queryResolver, transportClientFactory);
8. 根據配置生成不同的區域對映類,這裡是PropertyBasedAzToRegionMapper,預設設定分割槽集合關係。
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
private void populateDefaultAZToRegionMap() {
defaultRegionVsAzMap.put("us-east-1", "us-east-1a");
defaultRegionVsAzMap.put("us-east-1", "us-east-1c");
defaultRegionVsAzMap.put("us-east-1", "us-east-1d");
defaultRegionVsAzMap.put("us-east-1", "us-east-1e");
defaultRegionVsAzMap.put("us-west-1", "us-west-1a");
defaultRegionVsAzMap.put("us-west-1", "us-west-1c");
defaultRegionVsAzMap.put("us-west-2", "us-west-2a");
defaultRegionVsAzMap.put("us-west-2", "us-west-2b");
defaultRegionVsAzMap.put("us-west-2", "us-west-2c");
defaultRegionVsAzMap.put("eu-west-1", "eu-west-1a");
defaultRegionVsAzMap.put("eu-west-1", "eu-west-1b");
defaultRegionVsAzMap.put("eu-west-1", "eu-west-1c");
}
9. 獲取註冊服務資訊,如果Delta增量被禁用或者如果是第一次,則獲得所有應用程式。
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
獲取所有的註冊資訊
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
EurekaHttpClientDecorator#getApplications=》SessionedEurekaHttpClient#execute,建立EurekaHttpClient即RetryableEurekaHttpClient並儲存起來。
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
long now = System.currentTimeMillis();
long delay = now - lastReconnectTimeStamp;
if (delay >= currentSessionDurationMs) {
logger.debug("Ending a session and starting anew");
lastReconnectTimeStamp = now;
currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
}
EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
if (eurekaHttpClient == null) {
eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
}
return requestExecutor.execute(eurekaHttpClient);
}
執行RetryableEurekaHttpClient#exercute
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
for (int retry = 0; retry < numberOfRetries; retry++) {
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
if (candidateHosts == null) {
candidateHosts = getHostCandidates();
if (candidateHosts.isEmpty()) {
throw new TransportException("There is no known eureka server; cluster server list is empty");
}
}
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}
try {
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
if (retry > 0) {
logger.info("Request execution succeeded on retry #{}", retry);
}
return response;
}
logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
} catch (Exception e) {
logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace
}
// Connection error or 5xx from the server that must be retried on another server
delegate.compareAndSet(currentHttpClient, null);
if (currentEndpoint != null) {
quarantineSet.add(currentEndpoint);
}
}
throw new TransportException("Retry limit reached; giving up on completing the request");
}
獲取主機資訊列表,篩選出合適EurekaEndpoint儲存起來
private List<EurekaEndpoint> getHostCandidates() {
List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();
quarantineSet.retainAll(candidateHosts);
// If enough hosts are bad, we have no choice but start over again
int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());
//Prevent threshold is too large
if (threshold > candidateHosts.size()) {
threshold = candidateHosts.size();
}
if (quarantineSet.isEmpty()) {
// no-op
} else if (quarantineSet.size() >= threshold) {
logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
quarantineSet.clear();
} else {
List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
for (EurekaEndpoint endpoint : candidateHosts) {
if (!quarantineSet.contains(endpoint)) {
remainingHosts.add(endpoint);
}
}
candidateHosts = remainingHosts;
}
return candidateHosts;
}
AsyncResolver#getClusterEndpoints,啟動週期執行緒池執行TimedSupervisorTask
public List<T> getClusterEndpoints() {
long delay = refreshIntervalMs;
if (warmedUp.compareAndSet(false, true)) {
if (!doWarmUp()) {
delay = 0;
}
}
if (scheduled.compareAndSet(false, true)) {
scheduleTask(delay);
}
return resultsRef.get();
}
TimedSupervisorTask#run
public void run() {
Future<?> future = null;
try {
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
if (future != null) {
future.cancel(true);
}
if (!scheduler.isShutdown()) {
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
進而執行獲取EurekaEndpoin終端資訊,ZoneAffinityClusterResolver#getClusterEndpoints
private final Runnable updateTask = new Runnable() {
@Override
public void run() {
try {
List<T> newList = delegate.getClusterEndpoints();
if (newList != null) {
resultsRef.getAndSet(newList);
lastLoadTimestamp = System.currentTimeMillis();
} else {
logger.warn("Delegate returned null list of cluster endpoints");
}
logger.debug("Resolved to {}", newList);
} catch (Exception e) {
logger.warn("Failed to retrieve cluster endpoints from the delegate", e);
}
}
};