EurekaClient 原始碼淺析
背景:最近在研究springCloud,對服務註冊中心也非常好奇,然後就看了一下原始碼,而且以後面試也需要了解一下,因此記錄一下
注意:EurekaClient的內容很多,我只分析主幹部分
前提: 這裡的springboot版本為2.1.5.RELEASE,spring-cloud版本為Greenwich.SR1。
eureka架構圖
從這裡我們可以看到幾個重要的概念:
Eureka可以向EurekaServer Register、Renew、Cancel、Get Registy,下面就分別這幾個概念討論吧。
根據springboot的自動配置特性,我們找到org.springframework.cloud.spring-cloud-netflix-eureka-client.2.1.1.RELEASE.spring-cloud-netflix-eureka-client-2.1.1.RELEASE.jar下面的META-INF/spring.factories。
最重要的就是EurekaClientAutoConfiguration這個配置類了
開啟配置類EurekaClientAutoConfiguration擷取一下重要的配置:
EurekaClientConfigBean@Bean @ConditionalOnMissingBean(value = EurekaClientConfig.class,search = SearchStrategy.CURRENT) public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) { EurekaClientConfigBean client = new EurekaClientConfigBean(); if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) { // We don't register during bootstrap by default,but there will be another // chance later. client.setRegisterWithEureka(false); } return client;}
複製程式碼
這個bean是eureka的配置資訊,ymal配置的字首是 eureka.client
比如:
1.預設的eureka配置:
/** * Default Eureka URL. */public static final String DEFAULT_URL = "http://localhost:8761" + DEFAULT_PREFIX + "/";複製程式碼
也就是說,如果沒有配置eureka server的url,它會預設註冊到本地8761地址中。
2.預設從eureka server中拉取配置時間間隔
/** * Indicates how often(in seconds) to fetch the registry information from the eureka * server. */private int registryFetchIntervalSeconds = 30;複製程式碼
也就是說預設每個30秒從eureka server中拉取一次所有服務的配置資訊。
3.複製例項變化資訊到eureka伺服器所需要的時間間隔
/** * Indicates how often(in seconds) to replicate instance changes to be replicated to * the eureka server. */private int instanceInfoReplicationIntervalSeconds = 30;複製程式碼
還有很多配置可以自己去了解一下。
@Bean@ConditionalOnMissingBean(value = EurekaInstanceConfig.class,search = SearchStrategy.CURRENT)public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,ManagementMetadataProvider managementMetadataProvider) { String hostname = getProperty("eureka.instance.hostname"); boolean preferIpAddress = Boolean .parseBoolean(getProperty("eureka.instance.prefer-ip-address")); String ipAddress = getProperty("eureka.instance.ip-address"); boolean isSecurePortEnabled = Boolean .parseBoolean(getProperty("eureka.instance.secure-port-enabled")); String serverContextPath = env.getProperty("server.servlet.context-path","/"); int serverPort = Integer .valueOf(env.getProperty("server.port",env.getProperty("port","8080"))); Integer managementPort = env.getProperty("management.server.port",Integer.class); // nullable. String managementContextPath = env .getProperty("management.server.servlet.context-path"); // nullable. // should // be wrapped into // optional Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port",Integer.class); // nullable EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils); instance.setNonSecurePort(serverPort); instance.setInstanceId(getDefaultInstanceId(env)); instance.setPreferIpAddress(preferIpAddress); instance.setSecurePortEnabled(isSecurePortEnabled); if (StringUtils.hasText(ipAddress)) { instance.setIpAddress(ipAddress); } if (isSecurePortEnabled) { instance.setSecurePort(serverPort); } if (StringUtils.hasText(hostname)) { instance.setHostname(hostname); } String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path"); String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path"); if (StringUtils.hasText(statusPageUrlPath)) { instance.setStatusPageUrlPath(statusPageUrlPath); } if (StringUtils.hasText(healthCheckUrlPath)) { instance.setHealthCheckUrlPath(healthCheckUrlPath); } ManagementMetadata metadata = managementMetadataProvider.get(instance,serverPort,serverContextPath,managementContextPath,managementPort); if (metadata != null) { instance.setStatusPageUrl(metadata.getStatusPageUrl()); instance.setHealthCheckUrl(metadata.getHealthCheckUrl()); if (instance.isSecurePortEnabled()) { instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl()); } Map<String,String> metadataMap = instance.getMetadataMap(); metadataMap.computeIfAbsent("management.port",k -> String.valueOf(metadata.getManagementPort())); } else { // without the metadata the status and health check URLs will not be set // and the status page and health check url paths will not include the // context path so set them here if (StringUtils.hasText(managementContextPath)) { instance.setHealthCheckUrlPath( managementContextPath + instance.getHealthCheckUrlPath()); instance.setStatusPageUrlPath( managementContextPath + instance.getStatusPageUrlPath()); } } setupJmxPort(instance,jmxPort); return instance;}複製程式碼
這個主要是eureka client 這個幾點自身的一些配置資訊。
接下來是兩個最主要的bean : DiscoveryClient 和 EurekaServiceRegistry
@Beanpublic DiscoveryClient discoveryClient(EurekaClient client,EurekaClientConfig clientConfig) { return new EurekaDiscoveryClient(client,clientConfig);}@Beanpublic EurekaServiceRegistry eurekaServiceRegistry() { return new EurekaServiceRegistry();}複製程式碼
EurekaServiceRegistry 用於服務註冊,DiscoveryClient 用於服務發現
先看 EurekaServiceRegistry
我們看到這個類裡面有幾個方法:
register(EurekaRegistration reg) : 這個方法就是用於eureka client 註冊到eureka server的,不過具體的註冊邏輯不在這裡。Eureka client的註冊動作是在com.netflix.discovery.DiscoveryClient類的initScheduledTasks方法中執行的,其實最終的註冊是發生在 InstanceInfoReplicator類裡面的。 maybeInitializeClient(EurekaRegistration reg): 初始化 eureka client,eureka client 等會再說。 deregister(EurekaRegistration reg):登出下線操作,同樣具體下線操作不在這裡,這裡只是將本節點例項狀態設定為下線而已。 setStatus(EurekaRegistration registration,String status):設定本節點例項的狀態。 getStatus(EurekaRegistration registration):獲取本節點例項狀態。最後看一下DiscoveryClient,發現有兩個DiscoveryClient,一個是類,一個是介面:
類是netflix提供的,介面時springcloud提供的。回到EurekaClientAutoConfiguration,它在宣告DiscoveryClient這個bean的時候用的是EurekaDiscoveryClient
@Beanpublic DiscoveryClient discoveryClient(EurekaClient client,EurekaClientConfig clientConfig) { return new EurekaDiscoveryClient(client,clientConfig);}複製程式碼
找到 EurekaDiscoveryClient
發現EurekaDiscoveryClient裡麵包含netflix 裡的EurekaClient介面,而這個介面的預設實現就是DiscoveryClient
也就是說EurekaDiscoveryClient和DiscoveryClient是組合模式,最後呼叫的還是netflix的類DiscoveryClient。這個類才是服務註冊發現的關鍵,接下來分析這個類。
DiscoveryClient
先看看原始碼解釋:
根據這麼一段話,也就是說,這個類是用於和Eureka Server來互動,這個類的主要職責是:
a) <em>Registering</em> the instance with <tt>Eureka Server</tt> 服務註冊 b) <em>Renewal</em>of the lease with <tt>Eureka Server</tt> 服務續約c) <em>Cancellation</em> of the lease from <tt>Eureka Server</tt> during shutdown 服務下線
d) <em>Querying</em> the list of services/instances registered with <tt>Eureka Server</tt> 獲取註冊列表資訊DiscoveryClient構造器中的重要的方法 initScheduledTasks();
Fetch Registers : 獲取註冊列表資訊
if(clientConfig.shouldFetchRegistry()){ //registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),TimeUnit.SECONDS);}複製程式碼
從上面的程式碼可以看出,eureka client 開啟一個scheduler,每個一定的時間(預設是30秒,可以通過registryFetchIntervalSeconds配置)從eureka server 拉取eureka client的配置資訊。進一步看到CacheRefreshThread這個runnable。
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();//主要定時執行該方法
}
}複製程式碼
進一步分析refreshRegistry
void refreshRegistry() { try { ... // 定時獲取註冊資訊 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } ... } catch (Throwable e) { logger.error("Cannot fetch registry from server",e); }}複製程式碼
繼續分析 fetchRegistry(boolean forceFullRegistryFetch)方法,裡面最主要的是getAndStoreFullRegistry();來傳送HTTP請求到註冊中心來獲取註冊資訊,並快取到本地
private void getAndStoreFullRegistry() throws Throwable { ... Applications apps = null; EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(),remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } logger.info("The response status is {}",httpResponse.getStatusCode()); if (apps == null) { logger.error("The application is null for some reason. Not storing this information"); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration,currentUpdateGeneration + 1)) { // 把拉取的資訊快取在 localRegionApps 中 localRegionApps.set(this.filterAndShuffle(apps)); logger.debug("Got full registry with apps hashcode {}",apps.getAppsHashCode()); } else { logger.warn("Not updating applications as another thread is updating it already"); }}複製程式碼
而那個localRegionApps 就是用於快取拉取資訊的
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();複製程式碼
Renew 服務的續期
回到initScheduledTasks()方法,
if(clientConfig.shouldRegisterWithEureka()){ ... // Heartbeat timer scheduler.schedule(new TimedSupervisorTask("heartbeat",heartbeatExecutor,renewalIntervalInSecs,new HeartbeatThread()),TimeUnit.SECONDS); ...}else{ logger.info("Not registering with Eureka server per configuration");}複製程式碼
進一步看 HeartbeatThread 這個Runnable
/** * The heartbeat task that renews the lease in the given intervals. */private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } }}複製程式碼
/** * Renew with the eureka service by making the appropriate REST call */boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(),instanceInfo.getId(),instanceInfo,null); logger.debug(PREFIX + "{} - Heartbeat status: {}",appPathIdentifier,httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}",instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime();
// 註冊 boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!",e); return false; }}複製程式碼
以上程式碼是每隔一定時間去傳送心跳(預設30秒),如果返回的結果是404,就把client註冊到Eureka Server上。
Register 服務註冊
/** * Register with the eureka service by making the appropriate REST call. */boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...",appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}",e.getMessage(),e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}",httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();}複製程式碼
這個方法實在DiscoveryClient類中的。
在DiscoveryClient.initScheduledTasks()中,有一段這麼程式碼
// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator( this,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize...instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());複製程式碼
在InstanceInfoReplicator 構造方法中會建立一個scheduler。註冊方法的呼叫是通過InstanceInfoReplicator.instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds())傳送註冊請求到註冊中心
public void start(int initialDelayMs) { if (started.compareAndSet(false,true)) { instanceInfo.setIsDirty(); // for initial register Future next = scheduler.schedule(this,initialDelayMs,TimeUnit.SECONDS); scheduledPeriodicRef.set(next); }}複製程式碼
public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { // 註冊 discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator",t); } finally { Future next = scheduler.schedule(this,replicationIntervalSeconds,TimeUnit.SECONDS); scheduledPeriodicRef.set(next); }}複製程式碼
Cancel: 服務下線:
這個方法實在DiscoveryClient類中的。
/** * unregister w/ the eureka service. */void unregister() { // It can be null if shouldRegisterWithEureka == false if(eurekaTransport != null && eurekaTransport.registrationClient != null) { try { logger.info("Unregistering ..."); EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(),instanceInfo.getId()); logger.info(PREFIX + "{} - deregister status: {}",httpResponse.getStatusCode()); } catch (Exception e) { logger.error(PREFIX + "{} - de-registration failed{}",e); } }}複製程式碼
這個方法的呼叫是通過shutdown()方法的
/** * Shuts down Eureka Client. Also sends a deregistration request to the * eureka server. */@PreDestroy@Overridepublic synchronized void shutdown() { if (isShutdown.compareAndSet(false,true)) { logger.info("Shutting down DiscoveryClient ..."); if (statusChangeListener != null && applicationInfoManager != null) { applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); } cancelScheduledTasks(); // If APPINFO was registered if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) { applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); unregister(); } if (eurekaTransport != null) { eurekaTransport.shutdown(); } heartbeatStalenessMonitor.shutdown(); registryStalenessMonitor.shutdown(); logger.info("Completed shut down of DiscoveryClient"); }}複製程式碼
這個方法上有個註解@PreDestroy,表示在物件銷燬之前觸發的。可以看到這個方法中關閉了定時任務,通知eureka server 下線。
以上就是我對Eureka client的理解,
總結:
1.springcloud 整合了netflix,通過組合模式,EurekaDiscoveryClient包含EurekaClient,核心功能都是由DiscoveryClient來完成的。
2.Eureka client 主要有 Register、Renew、Cancel、Get Registy功能。
3.Eureka client和Server的互動通過大量的定時任務觸發,互動通過http協議,設計核心類為EurekaHttpClient。
4.在DiscoveryClient被建立的時候,在其構造方法中,啟動了三個執行緒池,然後分別啟動了三個定時器任務:註冊當前服務到註冊中心;持續傳送心跳進行續約任務;定時重新整理註冊中心註冊細資訊到本地,所以可以說,在專案啟動的時候這些任務就被執行了。