1. 程式人生 > >Eureka Client原始碼分析

Eureka Client原始碼分析

作者:陳剛,叩丁狼高階講師。本文為原創文章,轉載請註明出處。    

對於一個優秀的程式設計師而言,一個技術不僅要會用,還要知道他的實現原理和思想,即不僅要知其然還要知其所以然,這樣我們寫程式碼才會特別自信,出現bug才能很快定位到問題所在。接下來我們就來簡單探討一下SpringCloud的實現原理,即:原始碼分析

一.服務註冊

服務註冊與發現是SpringCloud最基礎的部分,我們就從這部分開始著手分析,我們來看一下我們第一章寫的服務註冊案例,圖:
image.png

我們知道當服務(Eureka Client)啟動會主動向 EurekaServer註冊自己,而消費者服務需要呼叫提供者服務實現消費時是需要向EurekaServer獲取目標服務的服務地址等資訊,那麼我們就先來分析一下EurekaClient是如何實現服務註冊的。

回想一下我們構造一個EurekaClient服務例項的時候需要做哪些事情呢?
1.在主程式配置類開啟 @EnableDiscoveryClient(或EnableEurekaClient)標籤開啟EurekaClient功能

@SpringBootApplication
@EnableDiscoveryClient
public class EurekaClientApplication {
    public static void main(String[] args) {
        SpringApplication.run(EurekaClientApplication.class, args);
    }
}

2.在applicatiton.properties中做一些服務的基礎配置和註冊中心地址配置

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:1111/eureka/  #服務註冊地址
  instance:
    prefer-ip-address: true
server:
  port: 3333
spring:
  application:
    name: consumer1
...

那麼我們就沿著這個2個點來切入原始碼:

點開 @ EnableDiscoveryClient 原始碼如下:

/**
 * Annotation to enable a DiscoveryClient implementation.
 * @author Spencer Gibb
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

    /**
     * If true, the ServiceRegistry will automatically register the local server.
     */
    boolean autoRegister() default true;
}

EnableDiscoveryClient的文件註釋:
Annotation to enable a DiscoveryClient implementation
告訴我們:這個EnableDiscoveryClient這個標籤是用來開啟 DiscoveryClient的,那麼我們來看一下 org.springframework.cloud.client.discovery.DiscoveryClient 如下:


/**
 * DiscoveryClient represents read operations commonly available to Discovery service such as
 * Netflix Eureka or consul.io
 * @author Spencer Gibb
 */
public interface DiscoveryClient {

    /**
     * A human readable description of the implementation, used in HealthIndicator
     * @return the description
     */
    String description();

    /**
     * Get all ServiceInstances associated with a particular serviceId
     * @param serviceId the serviceId to query
     * @return a List of ServiceInstance
     */
    List<ServiceInstance> getInstances(String serviceId);

    /**
     * @return all known service ids
     */
    List<String> getServices();

}

DiscoveryClient 的註釋:
DiscoveryClient represents read operations commonly available to Discovery service such as Netflix Eureka or consul.io
告訴我們這個介面定義了服務通常的讀取操作的抽象方法,分析這個介面下的三個方法,作用分別是:獲取備註 ,根據服務id獲取服務列表,獲取所有的服務的id

這個介面貌似到頂了,我們看一下他的實現類你可以看到一個讓你很親切的名字 EurekaDiscoveryClient(Eureka的服務客戶端) 如下:

public class EurekaDiscoveryClient implements DiscoveryClient {

    public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";

    private final EurekaInstanceConfig config;

    private final EurekaClient eurekaClient;

    public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) {
        this.config = config;
        this.eurekaClient = eurekaClient;
    }

    @Override
    public String description() {
        return DESCRIPTION;
    }

    @Override
    public List<ServiceInstance> getInstances(String serviceId) {
        List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,
                false);
        List<ServiceInstance> instances = new ArrayList<>();
        for (InstanceInfo info : infos) {
            instances.add(new EurekaServiceInstance(info));
        }
        return instances;
    }

  ...省略程式碼...

    @Override
    public List<String> getServices() {
        Applications applications = this.eurekaClient.getApplications();
        if (applications == null) {
            return Collections.emptyList();
        }
        List<Application> registered = applications.getRegisteredApplications();
        List<String> names = new ArrayList<>();
        for (Application app : registered) {
            if (app.getInstances().isEmpty()) {
                continue;
            }
            names.add(app.getName().toLowerCase());

        }
        return names;
    }
...

可以看到 EurekaDiscoveryClient 依賴了 EurekaClient 介面 ,而對於getServices和 getInstances方法的實現程式碼中都是呼叫了EurekaClient的方法,即:“this.eurekaClient…”

繼續跟蹤 EurekaClient 的原始碼:

@ImplementedBy(DiscoveryClient.class)
public interface EurekaClient extends LookupService {

EurekaClient 實現了 LookupService介面,繼續跟蹤 LookupService

/**
 * Lookup service for finding active instances.
 *
 * @author Karthik Ranganathan, Greg Kim.
 * @param <T> for backward compatibility

 */
public interface LookupService<T> {
 /**
     * Returns the corresponding {@link Application} object which is basically a
     * container of all registered <code>appName</code> {@link InstanceInfo}s.
     *
     * @param appName
     * @return a {@link Application} or null if we couldn't locate any app of
     *         the requested appName
     */
    Application getApplication(String appName);

    /**
     * Returns the {@link Applications} object which is basically a container of
     * all currently registered {@link Application}s.
     *
     * @return {@link Applications}
     */
    Applications getApplications();

    /**
     * Returns the {@link List} of {@link InstanceInfo}s matching the the passed
     * in id. A single {@link InstanceInfo} can possibly be registered w/ more
     * than one {@link Application}s
     *
     * @param id
     * @return {@link List} of {@link InstanceInfo}s or
     *         {@link java.util.Collections#emptyList()}
     */
    List<InstanceInfo> getInstancesById(String id);
....省略程式碼....

從註釋 :Lookup service for finding active instances.
可以知道,這個lookupService 的作用是用於查詢活動例項的服務。並提供了一些查詢方法

然而在 EurekaDiscoveryClient 中使用的到底是 EurekaClient 哪個實現類的例項呢?我們繼續跟蹤一下 EurekaClient的實現類 com.netflix.discovery.DiscoveryClient ,從包名可以知道這個類是netflix提供的:

/**
 * The class that is instrumental for interactions with <tt>Eureka Server</tt>.
 *
 * <p>
 * <tt>Eureka Client</tt> is responsible for a) <em>Registering</em> the
 * instance with <tt>Eureka Server</tt> b) <em>Renewal</em>of the lease with
 * <tt>Eureka Server</tt> c) <em>Cancellation</em> of the lease from
 * <tt>Eureka Server</tt> during shutdown
 * <p>
 * d) <em>Querying</em> the list of services/instances registered with
 * <tt>Eureka Server</tt>
 * <p>
 *
 * <p>
 * <tt>Eureka Client</tt> needs a configured list of <tt>Eureka Server</tt>
 * {@link java.net.URL}s to talk to.These {@link java.net.URL}s are typically amazon elastic eips
 * which do not change. All of the functions defined above fail-over to other
 * {@link java.net.URL}s specified in the list in the case of failure.
 * </p>
 *
 * @author Karthik Ranganathan, Greg Kim
 * @author Spencer Gibb
 *
 */
@Singleton
public class DiscoveryClient implements EurekaClient {

翻譯一下這個類的文件註釋,大致意思為:

DiscoveryClient 和 Eureka Server 類進行互動
向 Eureka Server 註冊服務
向 Eureka Server租約續期
服務關閉,取消租約續期
獲取服務列表
Eureka client需要配置 Eureka Server的url列表

看到這裡我們大概知道,其實真正實現服務發現的Netflix包中的com.netflix.discovery.DiscoveryClient類,我們整理一下這幾個類/介面的關係圖如下:
image.png

接下來我們詳細看一下DiscoveryClient是如何實現服務註冊和發現等功能的,找到DiscoveryClient中的程式碼:

 /**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
       ...省略程式碼...

            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

           instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
       ...省略程式碼...
    }

從方法名字就可以看出方法裡面初始化了很多的定時任務,

instanceInfo:是根據服務配置創建出來的服務例項相關資訊物件,是在EurekaClientAutoConfiguration類中的 eurekaApplicationInfoManager方法中被建立的

public ApplicationInfoManager eurekaApplicationInfoManager(
                EurekaInstanceConfig config) {
            InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
            return new ApplicationInfoManager(config, instanceInfo);
        }

而 EurekaInstanceConfig 就是application.properties配置的繫結

而instanceInfoReplicator 是一個執行緒物件,他的程式碼如下:

/**
 * A task for updating and replicating the local instanceinfo to the remote server. Properties of this task are:
 * - configured with a single update thread to guarantee sequential update to the remote server
 * - update tasks can be scheduled on-demand via onDemandUpdate()
 * - task processing is rate limited by burstSize
 * - a new update task is always scheduled automatically after an earlier update task. However if an on-demand task
 *   is started, the scheduled automatic update task is discarded (and a new one will be scheduled after the new
 *   on-demand update).
 *
 *   @author dliu
 */
class InstanceInfoReplicator implements Runnable {

翻譯註釋知道 InstanceInfoReplicator的作用是用於更新本地instanceinfo並將其複製到遠端伺服器的任務 ,其實就是把本地服務例項的相關配置資訊(地址,埠,服務名等)傳送到註冊中心完成註冊

我們來跟蹤一下他的 run方法 :

 public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

定位關鍵程式碼: discoveryClient.register(); 這裡就是在實現服務註冊,繼續跟蹤進去:

 boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

這裡通過呼叫:eurekaTransport.registrationClient.register(instanceInfo);實現註冊,而instanceInfo其實就是當前服務例項的元資料(配置資訊),繼續跟蹤

/**
 * Low level Eureka HTTP client API.
 *
 * @author Tomasz Bak
 */
public interface EurekaHttpClient {

    EurekaHttpResponse<Void> register(InstanceInfo info);

翻譯 Low level Eureka HTTP client API. 得知這裡是一個HTTP客戶端的API,那麼我們可以大膽猜測,register方法的實現其實就是通過 rest 請求的方式。繼續往下追蹤該方法

public abstract class EurekaHttpClientDecorator implements EurekaHttpClient {
    @Override
    public EurekaHttpResponse<Void> register(final InstanceInfo info) {
        return execute(new RequestExecutor<Void>() {
            @Override
            public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {
                return delegate.register(info);
            }

            @Override
            public RequestType getRequestType() {
                return RequestType.Register;
            }
        });
    }

看到這裡我們應該就明白了,在register方法中獲取到了 serviceUrl 即配置檔案中的註冊服務地址,,把InstanceInfo作為引數,底層通過EurekaHttpClient(Rest方式)來發請求請求,實現服務註冊。

服務獲取

繼續看com.netflix.discovery.DiscoveryClient的initScheduledTasks()方法,裡面還有兩個定時任務

 if (clientConfig.shouldFetchRegistry()) {
          // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);
  ....省略程式碼....

我們先看第一個任務:服務獲取 ,
clientConfig.getRegistryFetchIntervalSeconds()是從配置中獲取服務清單獲取時間間隔,他是執行執行緒是new HeartbeatThread(),我們跟蹤進去

class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

    @VisibleForTesting
    void refreshRegistry() {
        try {
    ...省略程式碼...
 boolean success = fetchRegistry(remoteRegionsModified);
...省略程式碼...

fetchRegistry:就是獲取登錄檔(註冊服務清單)的方法

 private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // If the delta is disabled or if it is the first time, get all
            // applications
            Applications applications = getApplications();

            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                getAndStoreFullRegistry();
            } else {
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applicat
...省略程式碼...

etAndStoreFullRegistry(); 獲得並存儲完整的登錄檔,跟蹤進去

private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        logger.info("Getting all instance registry info from the eureka server");

        Applications apps = null;
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());

        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }

我們可以看到 eurekaTransport.queryClient.這樣的程式碼其實就是通過Rest方式去獲取服務清單 最後通過 localRegionApps.set把服務儲存到本地區域

服務續約

繼續看 com.netflix.discovery.DiscoveryClient 中的 initScheduledTasks() 方法中的另一個定時任務

 // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

heartbeat :使用心跳機制實現服務續約,即每間隔多少秒去請求一下注冊中心證明服務還線上,防止被剔除。
renewalIntervalInSecs :就是心跳時間間隔 對應的配置:
eureka.instance.lease-renewal-interval-in-seconds=30
eureka.instance.lease-expiration-duration-in-seconds=90
我們來看一下他的執執行緒:HeartbeatThread

   private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

不難看出他是通過 eurekaTransport.registrationClient.sendHeartBeat:去傳送心跳 ,依然是Rest方式