1. 程式人生 > >SpringCloud Eureka 原始碼分析

SpringCloud Eureka 原始碼分析

目錄

SpringCloud-Eureka

整合專案

spring-cloud-netflix-eureka-server

spring-cloud-netflix-eureka-client

Eureka架構圖

image

關鍵概念

Region(區域)
  • 屬於AWS概念,主要為了給基礎設施服務劃分區域,實現高可用,提高網路傳輸速度。
Zone(可用區)
  • 屬於AWS概念,為了實現高可用,在每個Region中可以有多個Zone,且每個Zone都是獨立的,一個可用區出問題不會影響到其他可用區,這樣也可以實現網路的低延遲。上圖中的us-east-1c表示的就是一個可用區。
租約(lease)
  • Eureka Server 用於管理Erueka Client(主要管理Application Service)
  • 客戶端通過每隔30秒,向Eureka Server傳送心跳來續約,如果Eureka Server在90秒內沒有收到客戶端的續約,則會將該客戶端從登錄檔裡刪除。
Eureka Server
  • 提供服務的註冊和發現的功能
  • Register 提供服務註冊功能
  • Renew 提供服務續租約(lease)功能
  • Cancel 提供服務登出功能
  • Get Registry 提供登錄檔獲取功能,即服務發現
Application Service
  • 服務提供者。
Application Client
  • 服務消費者,每個client會快取登錄檔的資訊,這樣可以再Eureka Server不可用的時候,不影響服務消費者同服務提供者的互動,同ZK的主要區別,即實現CAP中的AP。

SpringCloud啟動Eureka 的過程:

EnableEurekaServer註解

SpringCloud通過註解EnableEurekaServer啟動eureka服務,其包含了EnableDiscoveryClient。

SpringCloud與jersey Rest框架

eureka 基於jersey實現Rest服務,因此,如果不想採用jersey

,則只需要過濾相關包的依賴即可,SpringCLoud則會採用RestTemplate來發送Rest請求。這也說明了eureka其是基於Servlet實現的。

jersey啟動

SpringCloud在容器啟動的時候,動態新增過濾器servletContainer 並攔截/eureka/* 的url。在該過濾器初始化的時候便載入了com.sun.jersey.spi.container.servlet.ServletContainer 該filter包含的classes有:作為jersey的WebComponentResourceConfig
- com.netflix.eureka.resources.ASGResource
- com.netflix.discovery.provider.DiscoveryJerseyProvider
- com.netflix.eureka.resources.ApplicationsResource
- com.netflix.eureka.resources.StatusResource
- com.netflix.eureka.resources.PeerReplicationResource
- com.netflix.eureka.resources.VIPResource
- com.netflix.eureka.resources.ServerInfoResource
- com.netflix.eureka.resources.InstancesResource
- com.netflix.eureka.resources.SecureVIPResource

再通過WebApplicationProvider初始化jersey服務。具體實現為WebApplicationImpl._initiate方法。

WebApplicationFactory

public static WebApplication createWebApplication() throws ContainerException {
    for (WebApplicationProvider wap : ServiceFinder.find(WebApplicationProvider.class)) {
        // Use the first provider found
        // 建立jersey服務
        return wap.createWebApplication();
    }
    throw new ContainerException("No WebApplication provider is present");
}

再由com.sun.jersey.core.spi.component.ProviderFactory 通過反射例項化com.netflix.discovery.provider.DiscoveryJerseyProvider負責將物件例項化和反序列化傳送到eureka伺服器

    private ComponentProvider __getComponentProvider(Class c) {
        try {
            ComponentInjector ci = new ComponentInjector(this.ipc, c);
            ComponentConstructor cc = new ComponentConstructor(this.ipc, c, ci);
            // 例項化Provider
            Object o = cc.getInstance();
            return new ProviderFactory.SingletonComponentProvider(ci, o);
        } catch (NoClassDefFoundError var5) {
            LOGGER.log(Level.CONFIG, "A dependent class, " + var5.getLocalizedMessage() + ", of the component " + c + " is not found." + " The component is ignored.");
            return null;
        } catch (InvocationTargetException var6) {
            if(var6.getCause() instanceof NoClassDefFoundError) {
                NoClassDefFoundError ncdf = (NoClassDefFoundError)var6.getCause();
                LOGGER.log(Level.CONFIG, "A dependent class, " + ncdf.getLocalizedMessage() + ", of the component " + c + " is not found." + " The component is ignored.");
                return null;
            } else {
                LOGGER.log(Level.SEVERE, "The provider class, " + c + ", could not be instantiated. Processing will continue but the class will not be utilized", var6.getTargetException());
                return null;
            }
        } catch (Exception var7) {
            LOGGER.log(Level.SEVERE, "The provider class, " + c + ", could not be instantiated. Processing will continue but the class will not be utilized", var7);
            return null;
        }
    }

配置資訊載入

ConfigurationClassEnhancer
- 負責註解Configuration和註解Bean等的例項化,如:
- WebMvcConfigurationSupport根據classpath中是否存在gson、jackson等來
- ArchaiusAutoConfiguration載入archaius配置資訊

protected void configureArchaius(ConfigurableEnvironmentConfiguration envConfig) {
    if (initialized.compareAndSet(false, true)) {
        // 獲取appName 沒有配置則用預設
        String appName = this.env.getProperty("spring.application.name");
        if (appName == null) {
            appName = "application";
            log.warn("No spring.application.name found, defaulting to 'application'");
        }
        // 後面程式碼省略...    
    }
}    

DefaultListableBeanFactory基於代理例項化eureka元件等,如配置資訊,EurekaClient

EurekaClientAutoConfiguration InstanceInfo EurekaClient

EurekaClientConfigBean

SpringCloud對EurekaClient的配置項

EurekaClientAutoConfiguration設定向eureka server或者向其他服務發現元件 註冊需要的資訊即InstanceInfo,此處為SpringCloud做的適配。

@ProvidedBy(EurekaConfigBasedInstanceInfoProvider.class)
@Serializer("com.netflix.discovery.converters.EntityBodyConverter")
@XStreamAlias("instance")// xml格式的節點
@JsonRootName("instance")// json格式的節點
public class InstanceInfo {//程式碼省略...}

@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
    // 例項化向登錄檔註冊所需要的資訊,如eureka主頁地址、本機ip、appName等
    InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
    return new ApplicationInfoManager(config, instanceInfo);
}

EurekaClientAutoConfiguration例項化EurekaClient,設定配置資訊,PropertyBasedClientConfigConstants為配置變數名,以及一些預設值,實現類為DiscoveryClient
- EurekaClientAutoConfiguration 原始碼


@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
    manager.getInfo(); // force initialization
    return new CloudEurekaClient(manager, config, this.optionalArgs,
                    this.context);
}

也會通過EurekaAutoServiceRegistration,將服務自動註冊到SpringCloud的服務發現註冊框架,定時進行健康檢查。

@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {
    return new EurekaAutoServiceRegistration(context, registry, registration);
}

DiscoveryClient 原始碼

  • DiscoveryClient 原始碼

    • 繼承關係

    DiscoveryClient 實現了 EurekaClient,EurekaClient繼承了LookupService
    image

    DiscoveryClient主要負責與eureka server互動,需要配置servers的url,支援故障轉移。eureka client 的主要作用有:

    • 註冊實力到eureka server
    • 向eureka server 續租約
    • 在cleint關閉時,取消同eureka server的租約
    • 查詢eureka server中的註冊資訊

    EurekaClient定義一個簡單的介面,給DiscoveryClient實現,主要為了相容eureka 1.x版本,使得1.x版本更容易過渡到2.x版本,主要作用有:

    • 提供各種不同的方式,以獲取各種InstanceInfos的能力
    • 提供獲取客戶端資料的能力,如獲取regions等。
    • 提供註冊和訪問健康檢查的能力

    LookupService提供查詢所有活動的Instances的介面。

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {
    if (args != null) {
        this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
        this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
        this.eventListeners.addAll(args.getEventListeners());
    } else {
        this.healthCheckCallbackProvider = null;
        this.healthCheckHandlerProvider = null;
    }
    // 向登錄檔註冊所需的資訊,提供了各種註冊元件需要的實現,也可以自己自定義實現,如
    // AbstractInstanceConfig提供了大量的預設資訊;
    // MyDataCenterInstanceConfigProvider提供非aws的資料中心;
    // CloudInstanceConfigProvider提供aws註冊所需的;
    // EurekaInstanceConfig提供了向eureka註冊所需的;
    // ApplicationInfoManager 使用的是EurekaInstanceConfig
    this.applicationInfoManager = applicationInfoManager;
    InstanceInfo myInfo = applicationInfoManager.getInfo();
    // 客戶端配置 大部分資訊採用DefaultEurekaClientConfig
    clientConfig = config;
    // 已經過時,主要為了相容遺留客戶端問題
    staticClientConfig = clientConfig;
    // 傳輸層如http請求超時、重試等資訊
    transportConfig = config.getTransportConfig();
    // 該eureka例項的資訊 如主機資訊,健康檢測介面等
    instanceInfo = myInfo;
    if (myInfo != null) {
        // 服務唯一地址  如:EUREKA-SERVER/172.16.17.60:eureka-server:30000
        appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
    } else {
        logger.warn("Setting instanceInfo to a passed in null value");
    }
    // 備份登錄檔資訊,當服務端不可用時,客戶度可以從這裡獲取登錄檔資訊
    this.backupRegistryProvider = backupRegistryProvider;
    // 如果eureka server的地址來源dns服務,則隨機獲取urls
    this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
    // 採用cas Applications存放的時伺服器返回的儲存客戶端註冊資訊的
    localRegionApps.set(new Applications());
    // cas 遞增版本,防止客戶端註冊舊的資訊
    fetchRegistryGeneration = new AtomicLong(0);

    remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
    remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
    // 判斷是否需要從eureka server 獲取登錄檔資訊 並初始化相應的度量資訊
    if (config.shouldFetchRegistry()) {
        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }
    // 是否需要將資訊註冊到eureka server上,通過這個開關可以實現,
    // 只獲取其他例項的資訊,而不將自己的資訊給其他客戶端發現
    if (config.shouldRegisterWithEureka()) {
        this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    // 屬於aws的基礎概念regin和zone 預設值為us-east-1
    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

    // 如果不需要註冊資訊到server和拉取註冊資訊表,則初始化完成。
    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
        logger.info("Client configured to neither register nor query for data.");
        scheduler = null;
        heartbeatExecutor = null;
        cacheRefreshExecutor = null;
        eurekaTransport = null;
        instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();

        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());
        return;  // no need to setup up an network tasks and we are done
    }

    try {
        // 初始化排程執行緒池 3個核心執行緒 並且為後臺執行。主要負責:
        // server 的url更新
        // 排程TimedSuperVisorTask被該TimerTask所包裹的執行緒必須是執行緒安全的,負責在子任務超時時,強制子任務超時。
        scheduler = Executors.newScheduledThreadPool(3,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());
        // 例項化心跳執行緒池,1個核心執行緒,預設最大的執行緒數為5個,使用直接提交執行緒
        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff
        // 例項化登錄檔快取重新整理執行緒池,最大執行緒數預設5個
        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        eurekaTransport = new EurekaTransport();
        // 排程服務端端點
        scheduleServerEndpointTask(eurekaTransport, args);
        // 支援dns和配置 做region對映
        AzToRegionMapper azToRegionMapper;
        if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
            azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
        } else {
            azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
        }
        if (null != remoteRegionsToFetch.get()) {
            azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
        }
        instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }
    // fetchRegistry方法,第一次獲取全部,之後是增量獲取,也可以通過true引數強制全量獲取,如果獲取成功則返回true,如果客戶端和服務端存在問題,則會返回false
    // 如果需要拉取登錄檔,且獲取登錄檔失敗,則從本地備份中獲取註冊資訊,也就是ca的高可用的實現。
    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }
    // 初始化所有的排程任務 程式碼見下面
    initScheduledTasks();
    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register timers", e);
    }

    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
            initTimestampMs, this.getApplications().size());
}
  • initScheduledTasks 初始化排程任務原始碼
/**
 * Initializes all scheduled tasks.
 */
private void initScheduledTasks() {
    // 是否需要拉取登錄檔,重新整理本地快取登錄檔
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        // 預設30秒
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        // 超時後,再次呼叫的時間間隔基數,預設為10,具體演算法可以參考 TimedSupervisorTask的run方法。
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        /**
        public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                                   int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
            // 程式碼省略...
            this.timeoutMillis = timeUnit.toMillis(timeout);
            this.delay = new AtomicLong(timeoutMillis);
            this.maxDelay = timeoutMillis * expBackOffBound;
            // 程式碼省略...
        }

        public void run() {
            Future future = null;
            try {
                // ...
                delay.set(timeoutMillis);
                // ...
            } catch (TimeoutException e) {
                logger.error("task supervisor timed out", e);
                timeoutCounter.increment();
                long currentDelay = delay.get();
                long newDelay = Math.min(maxDelay, currentDelay * 2);
                delay.compareAndSet(currentDelay, newDelay);
            }
            // ...
        }
        **/
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    // 是否傳送心跳資訊到server,即續約
    if (clientConfig.shouldRegisterWithEureka()) {
        // 每次發起續約時間 預設30秒
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        // 預設為10 同理
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

        // Heartbeat timer
        scheduler.schedule(
                new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // 更新和同步本地資訊到eureka server
        // InstanceInfo replicator
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                // 預設30秒
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                // 執行速度受限burstSize
                2); // burstSize
        // 狀態改變監聽器,在下面配置是否需要使用該觸發器
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                        InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
                // 觸發更新和同步本地訊息到erueka server
                instanceInfoReplicator.onDemandUpdate();
            }
        };
        // 是否註冊本地狀態改變觸發器,如果不註冊,則不會將本地狀態更新,同步到server
        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }
        // 設定多少秒後,啟動instanceInfoReplicator,預設為40秒
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}
  • CloudEurekaClient 主要負責傳送心跳資訊,方法為onCacheRefreshed
@Override
protected void onCacheRefreshed() {
    if (this.cacheRefreshedCount != null) { //might be called during construction and will be null
        long newCount = this.cacheRefreshedCount.incrementAndGet();
        log.trace("onCacheRefreshed called with count: " + newCount);
        this.publisher.publishEvent(new HeartbeatEvent(this, newCount));
    }
}

EurekaServer初始化 EurekaServerAutoConfiguration

EurekaServerConfigBean

SpringCloud關於EurekaServer的配置項

SpringCloud的InstanceRegistry與eureka的InstanceRegistry

  • SpringCloud通過EurekaServerAutoConfiguration適配,初始化eureka server的資訊,即InstanceRegistry

  • InstanceRegistry類圖(此類為SpringCloud定義的InstanceRegistry而非netflix的)

    • LeaseManager 主要負責租約的管理,如建立、更新和刪除。
    • AbstractInstanceRegistry 處理所有來自eureka client的註冊請求
    • 提供註冊、續約、登出、過期處理、狀態改變處理
    • 增量儲存登錄檔
    • PeerAwareInstanceRegistry
    • 處理Eureka Server節點間的同步
    • 如果Eureka Server啟動的時候會從其他節點獲取登錄檔資訊,如果獲取失敗,Eureka Server會不允許使用者在指定的一段時間(EurekaServerConfig.getWaitTimeInMsWhenSyncEmpty)裡獲取登錄檔資訊。
    • 自我保護實現

image

  • 初始化Eureka Server
    EurekaServerAutoConfiguration 原始碼
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
        ServerCodecs serverCodecs) {
    this.eurekaClient.getApplications(); // force initialization
    // 父類AbstractInstanceRegistry初始化一個新的空資訊的登錄檔
    return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
            serverCodecs, this.eurekaClient,
            //每分鐘更新次數,預設為1
            this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
            //確認取消租約時的值,預設為1 
            this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}

// AbstractInstanceRegistry
/**
 * Create a new, empty instance registry.
 */
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
    this.serverConfig = serverConfig;
    this.clientConfig = clientConfig;
    this.serverCodecs = serverCodecs;
    // 用於統計
    this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
    this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);
    // 記錄最近一次更新,每隔1分鐘更新一次
    this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
    // 清理過期增量資訊的排程器
    this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
            // 預設30秒
            serverConfig.getDeltaRetentionTimerIntervalInMs(),
            // 預設30秒
            serverConfig.getDeltaRetentionTimerIntervalInMs());
}

// getDeltaRetentionTask
private TimerTask getDeltaRetentionTask() {
    return new TimerTask() {

        @Override
        public void run() {
            Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
            while (it.hasNext()) {
                // getRetentionTimeInMSInDeltaQueue 保持增量資訊的快取時間 預設為3分鐘
                if (it.next().getLastUpdateTime() <
                        System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                    it.remove();
                } else {
                    break;
                }
            }
        }

    };
}

// PeerAwareInstanceRegistryImpl
@Inject
public PeerAwareInstanceRegistryImpl(
        EurekaServerConfig serverConfig,
        EurekaClientConfig clientConfig,
        ServerCodecs serverCodecs,
        EurekaClient eurekaClient
) {
    super(serverConfig, clientConfig, serverCodecs);
    this.eurekaClient = eurekaClient;
    // 分片數量最近一次更新,每個1分鐘更新一次
    this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
    // We first check if the instance is STARTING or DOWN, then we check explicit overrides,
    // then we check the status of a potentially existing lease.
    // 先檢查例項是啟動或者關閉(DownOrStartingRule),然後再檢查優先順序(InstanceStatus),再檢查可能存在的租約的狀態(LeaseExistsRule)UP或者OUT_OF_SERVICE
    // FirstMatchWinsCompositeRule 從狀態列表裡查詢第一個匹配的,如果狀態列表都沒有匹配的,則使用AlwaysMatchInstanceStatusRule返回預設的狀態UP
    this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
            new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
}

EurekaServerContext

DefaultEurekaServerContext 繼承 EurekaServerContext
- 初始化DefaultEurekaServerContext
- 本地Eureka Server的上下文,以及暴露給其他元件訪問的服務,如註冊
- DefaultEurekaServerContext原始碼

@PostConstruct
@Override
public void initialize() throws Exception {
    logger.info("Initializing ...");
    // PeerEurekaNode 負責複製所有的節點更新操作。Server端節點更新的主要實現類
    //啟動管理同等的eureka節點(PeerEurekaNode)的生命週期 排程器
    peerEurekaNodes.start();
    // 根據節點初始化InstanceRegistry,完成服務端的初始化
    registry.init(peerEurekaNodes);
    logger.info("Initialized");
}

// PeerEurekaNodes
public void start() {
    taskExecutor = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                    thread.setDaemon(true);
                    return thread;
                }
            }
    );
    try {
        // 先解析url,再更新節點資訊,先銷燬原來的,再更新
        updatePeerEurekaNodes(resolvePeerUrls());
        // 定義更新任務
        Runnable peersUpdateTask = new Runnable() {
            @Override
            public void run() {
                try {
                    updatePeerEurekaNodes(resolvePeerUrls());
                } catch (Throwable e) {
                    logger.error("Cannot update the replica Nodes", e);
                }

            }
        };
        // 排程更新任務
        taskExecutor.scheduleWithFixedDelay(
                peersUpdateTask,
                // 節點跟新間隔 預設 10分鐘
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                TimeUnit.MILLISECONDS
        );
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
    for (PeerEurekaNode node : peerEurekaNodes) {
        logger.info("Replica node URL:  " + node.getServiceUrl());
    }
}

//PeerEurekaNodes
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
    if (newPeerUrls.isEmpty()) {
        logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
        return;
    }
    // 獲取舊的urls,準備關閉
    Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
    toShutdown.removeAll(newPeerUrls);
    // 新增新的urls
    Set<String> toAdd = new HashSet<>(newPeerUrls);
    toAdd.removeAll(peerEurekaNodeUrls);
    // 如果發現沒改變,則直接返回
    if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
        return;
    }

    // Remove peers no long available
    List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
    // 關閉,移除不再可用的urls
    if (!toShutdown.isEmpty()) {
        logger.info("Removing no longer available peer nodes {}", toShutdown);
        int i = 0;
        while (i < newNodeList.size()) {
            PeerEurekaNode eurekaNode = newNodeList.get(i);
            if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                newNodeList.remove(i);
                eurekaNode.shutDown();
            } else {
                i++;
            }
        }
    }

    // Add new peers
    if (!toAdd.isEmpty()) {
        logger.info("Adding new peer nodes {}", toAdd);
        for (String peerUrl : toAdd) {
            // 建立新的urls
            newNodeList.add(createPeerEurekaNode(peerUrl));
        }
    }

    this.peerEurekaNodes = newNodeList;
    this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}

// createPeerEurekaNode
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
    // 建立Jersey 客戶端 傳送註冊、狀態更新、心跳等請求
    HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
    String targetHost = hostFromUrl(peerEurekaNodeUrl);
    if (targetHost == null) {
        targetHost = "host";
    }
    // 建立PeerEurekaNode
    return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}

// PeerEurekaNode
PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
                                     HttpReplicationClient replicationClient, EurekaServerConfig config,
                                     int batchSize, long maxBatchingDelayMs,
                                     long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
    this.registry = registry;
    this.targetHost = targetHost;
    this.replicationClient = replicationClient;

    this.serviceUrl = serviceUrl;
    this.config = config;
    this.maxProcessingDelayMs = config.getMaxTimeForReplication();
    // 獲取批處理名稱,根據配置的url獲取hostName,如果hostName失敗,則直接採用配置的url做為批處理名稱
    String batcherName = getBatcherName();
    // 執行任務的客戶端需要實現的介面,提供了兩個介面,一個處理單個任務,一個處理多個任務,都會在同一個時間執行,多個任務的會聚合多個任務的返回結果,且返回的型別是一樣的。處理結果有Success, Congestion, TransientError(任務失敗,但過會會重試), PermanentError(任務失敗,且不再重試)
    ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
    // 建立批處理任務
    this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
            batcherName,
            // 批處理任務最大的緩衝區大小,預設1萬個,如果超出,則會判斷哪些任務已經過期,過期則移除,新增新的任務
            config.getMaxElementsInPeerReplicationPool(),
            // 最大批處理個數,250
            batchSize,
            // 分配給副本的最大執行緒數 預設20
            config.getMaxThreadsForPeerReplication(),
            // 最大的批處理間隔 500毫秒
            maxBatchingDelayMs,
            // 服務不可用後,休息1秒
            serverUnavailableSleepTimeMs,
            // 網路異常後,每個100毫秒進行重試
            retrySleepTimeMs,
            // 處理任務
            taskProcessor
    );
    this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
            targetHost,
            // 緩衝區 1萬
            config.getMaxElementsInStatusReplicationPool(),
            // 分配給副本處理狀態的最大執行緒數 預設1個
            config.getMaxThreadsForStatusReplication(),
            // 最大的批處理間隔 500毫秒
            maxBatchingDelayMs,
            serverUnavailableSleepTimeMs,
            retrySleepTimeMs,
            taskProcessor
    );
}

//PeerAwareInstanceRegistry
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
    // 啟動更新記錄器
    this.numberOfReplicationsLastMin.start();
    this.peerEurekaNodes = peerEurekaNodes;
    // 初始化註冊資訊快取,以供客戶端查詢使用,採用的是Guava的堆快取,具體實現類 ResponseCacheImpl
    initializedResponseCache();
    // 啟動檢查是否因為網路分割槽導致的更新急劇下降,從而防止服務被誤刪,即自我保護模式
    scheduleRenewalThresholdUpdateTask();
    // 初始化獲取其他Region區域的登錄檔資訊的排程器
    initRemoteRegionRegistry();

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
    }
}
PeerEurekaNodes 與 PeerEurekaNode

PeerEurekaNodes類圖
image
PeerEurekaNode類圖
image
- PeerEurekaNodes 通過start方法開啟 管理 PeerEurekaNode的生命週期排程
- PeerEurekaNode 節點間互動的主要實現類,如心跳等
- 定義了當網路異常後,每隔100毫秒進行重試
- 如果伺服器不可用,進行1秒的休眠
- 定義最大的批處理排程間隔 500毫秒
- 定義了最大的批處理請求250

EurekaServerBootstrap

初始化完成EurekaServerContext後,接下來建立EurekaServerBootstrap(其同com.netflix.eureka.EurekaBootStrap的程式碼基本一致,netflix是通過監聽ServletContextListener事件來啟動eureka server),
再通過EurekaServerInitializerConfiguration啟動執行緒呼叫EurekaServerBootstrap的contextInitialized方法

@Override
public void start() {
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                //TODO: is this class even needed now?
                // 呼叫contextInitialized Eureka Server進行初始化
                eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
                log.info("Started Eureka Server");
                // 釋出相關事件
                publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                EurekaServerInitializerConfiguration.this.running = true;
                publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
            }
            catch (Exception ex) {
                // Help!
                log.error("Could not initialize Eureka servlet context", ex);
            }
        }
    }).start();
}

// EurekaServerBootstrap
public void contextInitialized(ServletContext context) {
    try {
        // 初始化環境配置資訊,eureka.datacenter和eureka.environment,如果沒配置第一個為default,第二個為test
        initEurekaEnvironment();
        // 初始化Eureka Server Context,呼叫PeerAwareInstanceRegistryImpl同步註冊資訊
        initEurekaServerContext();
        // 將EurekaServerContext設定到ServletContext中
        context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    }
    catch (Throwable e) {
        log.error("Cannot bootstrap eureka server :", e);
        throw new RuntimeException("Cannot bootstrap eureka server :", e);
    }
}

至此,eureka server也就啟動完成了,接下來都是通過排程來實現互動。

總結

  • EurekaClient的責任主要提現在DiscoveryClient類的實現;
  • EurekaServer的責任主要體現在PeerEurekaNodes與PeerEurekaNode類的實現;

  • 其他相關類:

    • 客戶端配置:EurekaClientConfigBean
    • 服務端配置:EurekaServerConfigBean
    • 服務註冊操作類:SpringCloud與netflix的InstanceRegistry
    • EurekaServer啟動類:SpringCloud的EurekaServerBootstrap