1. 程式人生 > 程式設計 >EurekaClient 原始碼淺析

EurekaClient 原始碼淺析

背景:最近在研究springCloud,對服務註冊中心也非常好奇,然後就看了一下原始碼,而且以後面試也需要了解一下,因此記錄一下

注意:EurekaClient的內容很多,我只分析主幹部分


前提: 這裡的springboot版本為2.1.5.RELEASE,spring-cloud版本為Greenwich.SR1。


eureka架構圖


從這裡我們可以看到幾個重要的概念:

Eureka可以向EurekaServer Register、Renew、Cancel、Get Registy,下面就分別這幾個概念討論吧。

根據springboot的自動配置特性,我們找到org.springframework.cloud.spring-cloud-netflix-eureka-client.2.1.1.RELEASE.spring-cloud-netflix-eureka-client-2.1.1.RELEASE.jar下面的META-INF/spring.factories。


最重要的就是EurekaClientAutoConfiguration這個配置類了

開啟配置類EurekaClientAutoConfiguration擷取一下重要的配置:

EurekaClientConfigBean

@Bean  @ConditionalOnMissingBean(value = EurekaClientConfig.class,search = SearchStrategy.CURRENT)  public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {     EurekaClientConfigBean client = new
EurekaClientConfigBean(); if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) { // We don't register during bootstrap by default,but there will be another // chance later. client.setRegisterWithEureka(false); } return client;} 複製程式碼

這個bean是eureka的配置資訊,ymal配置的字首是 eureka.client

開始的。這個配置類中有大量的eureka client預設配置。

比如:

1.預設的eureka配置:

/** * Default Eureka URL. */public static final String DEFAULT_URL = "http://localhost:8761" + DEFAULT_PREFIX + "/";複製程式碼

也就是說,如果沒有配置eureka server的url,它會預設註冊到本地8761地址中。

2.預設從eureka server中拉取配置時間間隔

/** * Indicates how often(in seconds) to fetch the registry information from the eureka  * server. */private int registryFetchIntervalSeconds = 30;複製程式碼

也就是說預設每個30秒從eureka server中拉取一次所有服務的配置資訊。

3.複製例項變化資訊到eureka伺服器所需要的時間間隔

/**  * Indicates how often(in seconds) to replicate instance changes to be replicated to * the eureka server. */private int instanceInfoReplicationIntervalSeconds = 30;複製程式碼

還有很多配置可以自己去了解一下。


EurekaInstanceConfigBean

@Bean@ConditionalOnMissingBean(value = EurekaInstanceConfig.class,search = SearchStrategy.CURRENT)public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,ManagementMetadataProvider managementMetadataProvider) {   String hostname = getProperty("eureka.instance.hostname");   boolean preferIpAddress = Boolean         .parseBoolean(getProperty("eureka.instance.prefer-ip-address"));   String ipAddress = getProperty("eureka.instance.ip-address");   boolean isSecurePortEnabled = Boolean         .parseBoolean(getProperty("eureka.instance.secure-port-enabled"));   String serverContextPath = env.getProperty("server.servlet.context-path","/");   int serverPort = Integer         .valueOf(env.getProperty("server.port",env.getProperty("port","8080")));   Integer managementPort = env.getProperty("management.server.port",Integer.class); // nullable.     String managementContextPath = env         .getProperty("management.server.servlet.context-path"); // nullable.                                                   // should   // be wrapped into   // optional   Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port",Integer.class); // nullable   EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);   instance.setNonSecurePort(serverPort);   instance.setInstanceId(getDefaultInstanceId(env));   instance.setPreferIpAddress(preferIpAddress);   instance.setSecurePortEnabled(isSecurePortEnabled);   if (StringUtils.hasText(ipAddress)) {      instance.setIpAddress(ipAddress);   }   if (isSecurePortEnabled) {      instance.setSecurePort(serverPort);   }   if (StringUtils.hasText(hostname)) {      instance.setHostname(hostname);   }   String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");   String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");   if (StringUtils.hasText(statusPageUrlPath)) {      instance.setStatusPageUrlPath(statusPageUrlPath);   }   if (StringUtils.hasText(healthCheckUrlPath)) {      instance.setHealthCheckUrlPath(healthCheckUrlPath);   }   ManagementMetadata metadata = managementMetadataProvider.get(instance,serverPort,serverContextPath,managementContextPath,managementPort);   if (metadata != null) {      instance.setStatusPageUrl(metadata.getStatusPageUrl());      instance.setHealthCheckUrl(metadata.getHealthCheckUrl());      if (instance.isSecurePortEnabled()) {         instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());      }      Map<String,String> metadataMap = instance.getMetadataMap();      metadataMap.computeIfAbsent("management.port",k -> String.valueOf(metadata.getManagementPort()));   }   else {      // without the metadata the status and health check URLs will not be set      // and the status page and health check url paths will not include the      // context path so set them here      if (StringUtils.hasText(managementContextPath)) {         instance.setHealthCheckUrlPath(               managementContextPath + instance.getHealthCheckUrlPath());         instance.setStatusPageUrlPath(               managementContextPath + instance.getStatusPageUrlPath());      }   }   setupJmxPort(instance,jmxPort);   return instance;}複製程式碼

這個主要是eureka client 這個幾點自身的一些配置資訊。

接下來是兩個最主要的bean : DiscoveryClient 和 EurekaServiceRegistry

@Beanpublic DiscoveryClient discoveryClient(EurekaClient client,EurekaClientConfig clientConfig) {   return new EurekaDiscoveryClient(client,clientConfig);}@Beanpublic EurekaServiceRegistry eurekaServiceRegistry() {   return new EurekaServiceRegistry();}複製程式碼

EurekaServiceRegistry 用於服務註冊,DiscoveryClient 用於服務發現

先看 EurekaServiceRegistry


我們看到這個類裡面有幾個方法:

register(EurekaRegistration reg) : 這個方法就是用於eureka client 註冊到eureka server的,不過具體的註冊邏輯不在這裡。Eureka client的註冊動作是在com.netflix.discovery.DiscoveryClient類的initScheduledTasks方法中執行的,其實最終的註冊是發生在 InstanceInfoReplicator類裡面的。 maybeInitializeClient(EurekaRegistration reg): 初始化 eureka client,eureka client 等會再說。 deregister(EurekaRegistration reg):登出下線操作,同樣具體下線操作不在這裡,這裡只是將本節點例項狀態設定為下線而已。 setStatus(EurekaRegistration registration,String status):設定本節點例項的狀態。 getStatus(EurekaRegistration registration):獲取本節點例項狀態。

最後看一下DiscoveryClient,發現有兩個DiscoveryClient,一個是類,一個是介面:


類是netflix提供的,介面時springcloud提供的。回到EurekaClientAutoConfiguration,它在宣告DiscoveryClient這個bean的時候用的是EurekaDiscoveryClient

@Beanpublic DiscoveryClient discoveryClient(EurekaClient client,EurekaClientConfig clientConfig) {   return new EurekaDiscoveryClient(client,clientConfig);}複製程式碼

找到 EurekaDiscoveryClient


發現EurekaDiscoveryClient裡麵包含netflix 裡的EurekaClient介面,而這個介面的預設實現就是DiscoveryClient


也就是說EurekaDiscoveryClient和DiscoveryClient是組合模式,最後呼叫的還是netflix的類DiscoveryClient。這個類才是服務註冊發現的關鍵,接下來分析這個類。

DiscoveryClient

先看看原始碼解釋:


根據這麼一段話,也就是說,這個類是用於和Eureka Server來互動,這個類的主要職責是:

a) <em>Registering</em> the instance with <tt>Eureka Server</tt> 服務註冊 b) <em>Renewal</em>of the lease with <tt>Eureka Server</tt> 服務續約

c) <em>Cancellation</em> of the lease from <tt>Eureka Server</tt> during shutdown  服務下線

d) <em>Querying</em> the list of services/instances registered with <tt>Eureka Server</tt> 獲取註冊列表資訊

DiscoveryClient構造器中的重要的方法 initScheduledTasks();

Fetch Registers : 獲取註冊列表資訊

if(clientConfig.shouldFetchRegistry()){    //registry cache refresh timer    int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();    int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();    scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),TimeUnit.SECONDS);}複製程式碼

從上面的程式碼可以看出,eureka client 開啟一個scheduler,每個一定的時間(預設是30秒,可以通過registryFetchIntervalSeconds配置)從eureka server 拉取eureka client的配置資訊。進一步看到CacheRefreshThread這個runnable。

class CacheRefreshThread implements Runnable { 
public void run() { 
  refreshRegistry();//主要定時執行該方法 
  } 
}複製程式碼

進一步分析refreshRegistry

void refreshRegistry() {    try {              ...                // 定時獲取註冊資訊              boolean success = fetchRegistry(remoteRegionsModified);        if (success) {            registrySize = localRegionApps.get().size();            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();        }               ...    } catch (Throwable e) {        logger.error("Cannot fetch registry from server",e);    }}複製程式碼

繼續分析 fetchRegistry(boolean forceFullRegistryFetch)方法,裡面最主要的是getAndStoreFullRegistry();來傳送HTTP請求到註冊中心來獲取註冊資訊,並快取到本地

private void getAndStoreFullRegistry() throws Throwable {       ...    Applications apps = null;    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(),remoteRegionsRef.get());    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {        apps = httpResponse.getEntity();    }    logger.info("The response status is {}",httpResponse.getStatusCode());    if (apps == null) {        logger.error("The application is null for some reason. Not storing this information");    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration,currentUpdateGeneration + 1)) {             // 把拉取的資訊快取在 localRegionApps 中             localRegionApps.set(this.filterAndShuffle(apps));        logger.debug("Got full registry with apps hashcode {}",apps.getAppsHashCode());    } else {        logger.warn("Not updating applications as another thread is updating it already");    }}複製程式碼

而那個localRegionApps 就是用於快取拉取資訊的

private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();複製程式碼


Renew 服務的續期

回到initScheduledTasks()方法,

if(clientConfig.shouldRegisterWithEureka()){    ...    // Heartbeat timer    scheduler.schedule(new TimedSupervisorTask("heartbeat",heartbeatExecutor,renewalIntervalInSecs,new HeartbeatThread()),TimeUnit.SECONDS);          ...}else{    logger.info("Not registering with Eureka server per configuration");}複製程式碼

進一步看 HeartbeatThread 這個Runnable

/** * The heartbeat task that renews the lease in the given intervals. */private class HeartbeatThread implements Runnable {    public void run() {        if (renew()) {            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();        }    }}複製程式碼

/** * Renew with the eureka service by making the appropriate REST call */boolean renew() {    EurekaHttpResponse<InstanceInfo> httpResponse;    try {        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(),instanceInfo.getId(),instanceInfo,null);        logger.debug(PREFIX + "{} - Heartbeat status: {}",appPathIdentifier,httpResponse.getStatusCode());        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {            REREGISTER_COUNTER.increment();            logger.info(PREFIX + "{} - Re-registering apps/{}",instanceInfo.getAppName());            long timestamp = instanceInfo.setIsDirtyWithTime();
            // 註冊            boolean success = register();            if (success) {                instanceInfo.unsetIsDirty(timestamp);            }            return success;        }        return httpResponse.getStatusCode() == Status.OK.getStatusCode();    } catch (Throwable e) {        logger.error(PREFIX + "{} - was unable to send heartbeat!",e);        return false;    }}複製程式碼

以上程式碼是每隔一定時間去傳送心跳(預設30秒),如果返回的結果是404,就把client註冊到Eureka Server上。


Register 服務註冊

/** * Register with the eureka service by making the appropriate REST call. */boolean register() throws Throwable {    logger.info(PREFIX + "{}: registering service...",appPathIdentifier);    EurekaHttpResponse<Void> httpResponse;    try {        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);    } catch (Exception e) {        logger.warn(PREFIX + "{} - registration failed {}",e.getMessage(),e);        throw e;    }    if (logger.isInfoEnabled()) {        logger.info(PREFIX + "{} - registration status: {}",httpResponse.getStatusCode());    }    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();}複製程式碼

這個方法實在DiscoveryClient類中的。

在DiscoveryClient.initScheduledTasks()中,有一段這麼程式碼

// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(        this,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize...instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());複製程式碼

在InstanceInfoReplicator 構造方法中會建立一個scheduler。註冊方法的呼叫是通過InstanceInfoReplicator.instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds())傳送註冊請求到註冊中心

public void start(int initialDelayMs) {    if (started.compareAndSet(false,true)) {        instanceInfo.setIsDirty();  // for initial register        Future next = scheduler.schedule(this,initialDelayMs,TimeUnit.SECONDS);        scheduledPeriodicRef.set(next);    }}複製程式碼

public void run() {    try {        discoveryClient.refreshInstanceInfo();        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();        if (dirtyTimestamp != null) {            // 註冊            discoveryClient.register();            instanceInfo.unsetIsDirty(dirtyTimestamp);        }    } catch (Throwable t) {        logger.warn("There was a problem with the instance info replicator",t);    } finally {        Future next = scheduler.schedule(this,replicationIntervalSeconds,TimeUnit.SECONDS);        scheduledPeriodicRef.set(next);    }}複製程式碼


Cancel: 服務下線:

這個方法實在DiscoveryClient類中的。

/** * unregister w/ the eureka service. */void unregister() {    // It can be null if shouldRegisterWithEureka == false    if(eurekaTransport != null && eurekaTransport.registrationClient != null) {        try {            logger.info("Unregistering ...");            EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(),instanceInfo.getId());            logger.info(PREFIX + "{} - deregister  status: {}",httpResponse.getStatusCode());        } catch (Exception e) {            logger.error(PREFIX + "{} - de-registration failed{}",e);        }    }}複製程式碼

這個方法的呼叫是通過shutdown()方法的

/** * Shuts down Eureka Client. Also sends a deregistration request to the * eureka server. */@PreDestroy@Overridepublic synchronized void shutdown() {    if (isShutdown.compareAndSet(false,true)) {        logger.info("Shutting down DiscoveryClient ...");        if (statusChangeListener != null && applicationInfoManager != null) {            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());        }        cancelScheduledTasks();        // If APPINFO was registered        if (applicationInfoManager != null                && clientConfig.shouldRegisterWithEureka()                && clientConfig.shouldUnregisterOnShutdown()) {            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);            unregister();        }        if (eurekaTransport != null) {            eurekaTransport.shutdown();        }        heartbeatStalenessMonitor.shutdown();        registryStalenessMonitor.shutdown();        logger.info("Completed shut down of DiscoveryClient");    }}複製程式碼

這個方法上有個註解@PreDestroy,表示在物件銷燬之前觸發的。可以看到這個方法中關閉了定時任務,通知eureka server 下線。

以上就是我對Eureka client的理解,

總結:

1.springcloud 整合了netflix,通過組合模式,EurekaDiscoveryClient包含EurekaClient,核心功能都是由DiscoveryClient來完成的。

2.Eureka client 主要有 Register、Renew、Cancel、Get Registy功能。

3.Eureka client和Server的互動通過大量的定時任務觸發,互動通過http協議,設計核心類為EurekaHttpClient。

4.在DiscoveryClient被建立的時候,在其構造方法中,啟動了三個執行緒池,然後分別啟動了三個定時器任務:註冊當前服務到註冊中心;持續傳送心跳進行續約任務;定時重新整理註冊中心註冊細資訊到本地,所以可以說,在專案啟動的時候這些任務就被執行了。