SpringCloud Eureka 原始碼分析
目錄
SpringCloud-Eureka
整合專案
spring-cloud-netflix-eureka-server
spring-cloud-netflix-eureka-client
Eureka架構圖
關鍵概念
Region(區域)
- 屬於AWS概念,主要為了給基礎設施服務劃分區域,實現高可用,提高網路傳輸速度。
Zone(可用區)
- 屬於AWS概念,為了實現高可用,在每個Region中可以有多個Zone,且每個Zone都是獨立的,一個可用區出問題不會影響到其他可用區,這樣也可以實現網路的低延遲。上圖中的us-east-1c表示的就是一個可用區。
租約(lease)
- Eureka Server 用於管理Erueka Client(主要管理Application Service)
- 客戶端通過每隔30秒,向Eureka Server傳送心跳來續約,如果Eureka Server在90秒內沒有收到客戶端的續約,則會將該客戶端從登錄檔裡刪除。
Eureka Server
- 提供服務的註冊和發現的功能
- Register 提供服務註冊功能
- Renew 提供服務續租約(lease)功能
- Cancel 提供服務登出功能
- Get Registry 提供登錄檔獲取功能,即服務發現
Application Service
- 服務提供者。
Application Client
- 服務消費者,每個client會快取登錄檔的資訊,這樣可以再Eureka Server不可用的時候,不影響服務消費者同服務提供者的互動,同ZK的主要區別,即實現CAP中的AP。
SpringCloud啟動Eureka 的過程:
EnableEurekaServer註解
SpringCloud通過註解EnableEurekaServer啟動eureka服務,其包含了EnableDiscoveryClient。
SpringCloud與jersey Rest框架
eureka 基於jersey實現Rest服務,因此,如果不想採用jersey
jersey啟動
SpringCloud在容器啟動的時候,動態新增過濾器servletContainer 並攔截/eureka/* 的url。在該過濾器初始化的時候便載入了com.sun.jersey.spi.container.servlet.ServletContainer 該filter包含的classes有:作為jersey的WebComponent的ResourceConfig
- com.netflix.eureka.resources.ASGResource
- com.netflix.discovery.provider.DiscoveryJerseyProvider
- com.netflix.eureka.resources.ApplicationsResource
- com.netflix.eureka.resources.StatusResource
- com.netflix.eureka.resources.PeerReplicationResource
- com.netflix.eureka.resources.VIPResource
- com.netflix.eureka.resources.ServerInfoResource
- com.netflix.eureka.resources.InstancesResource
- com.netflix.eureka.resources.SecureVIPResource
再通過WebApplicationProvider初始化jersey服務。具體實現為WebApplicationImpl._initiate方法。
WebApplicationFactory
public static WebApplication createWebApplication() throws ContainerException {
for (WebApplicationProvider wap : ServiceFinder.find(WebApplicationProvider.class)) {
// Use the first provider found
// 建立jersey服務
return wap.createWebApplication();
}
throw new ContainerException("No WebApplication provider is present");
}
再由com.sun.jersey.core.spi.component.ProviderFactory 通過反射例項化com.netflix.discovery.provider.DiscoveryJerseyProvider負責將物件例項化和反序列化傳送到eureka伺服器
private ComponentProvider __getComponentProvider(Class c) {
try {
ComponentInjector ci = new ComponentInjector(this.ipc, c);
ComponentConstructor cc = new ComponentConstructor(this.ipc, c, ci);
// 例項化Provider
Object o = cc.getInstance();
return new ProviderFactory.SingletonComponentProvider(ci, o);
} catch (NoClassDefFoundError var5) {
LOGGER.log(Level.CONFIG, "A dependent class, " + var5.getLocalizedMessage() + ", of the component " + c + " is not found." + " The component is ignored.");
return null;
} catch (InvocationTargetException var6) {
if(var6.getCause() instanceof NoClassDefFoundError) {
NoClassDefFoundError ncdf = (NoClassDefFoundError)var6.getCause();
LOGGER.log(Level.CONFIG, "A dependent class, " + ncdf.getLocalizedMessage() + ", of the component " + c + " is not found." + " The component is ignored.");
return null;
} else {
LOGGER.log(Level.SEVERE, "The provider class, " + c + ", could not be instantiated. Processing will continue but the class will not be utilized", var6.getTargetException());
return null;
}
} catch (Exception var7) {
LOGGER.log(Level.SEVERE, "The provider class, " + c + ", could not be instantiated. Processing will continue but the class will not be utilized", var7);
return null;
}
}
配置資訊載入
ConfigurationClassEnhancer
- 負責註解Configuration和註解Bean等的例項化,如:
- WebMvcConfigurationSupport根據classpath中是否存在gson、jackson等來
- ArchaiusAutoConfiguration載入archaius配置資訊
protected void configureArchaius(ConfigurableEnvironmentConfiguration envConfig) {
if (initialized.compareAndSet(false, true)) {
// 獲取appName 沒有配置則用預設
String appName = this.env.getProperty("spring.application.name");
if (appName == null) {
appName = "application";
log.warn("No spring.application.name found, defaulting to 'application'");
}
// 後面程式碼省略...
}
}
DefaultListableBeanFactory基於代理例項化eureka元件等,如配置資訊,EurekaClient
EurekaClientAutoConfiguration InstanceInfo EurekaClient
EurekaClientConfigBean
SpringCloud對EurekaClient的配置項
EurekaClientAutoConfiguration設定向eureka server或者向其他服務發現元件 註冊需要的資訊即InstanceInfo,此處為SpringCloud做的適配。
@ProvidedBy(EurekaConfigBasedInstanceInfoProvider.class)
@Serializer("com.netflix.discovery.converters.EntityBodyConverter")
@XStreamAlias("instance")// xml格式的節點
@JsonRootName("instance")// json格式的節點
public class InstanceInfo {//程式碼省略...}
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
// 例項化向登錄檔註冊所需要的資訊,如eureka主頁地址、本機ip、appName等
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
EurekaClientAutoConfiguration例項化EurekaClient,設定配置資訊,PropertyBasedClientConfigConstants為配置變數名,以及一些預設值,實現類為DiscoveryClient
- EurekaClientAutoConfiguration 原始碼
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
manager.getInfo(); // force initialization
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
也會通過EurekaAutoServiceRegistration,將服務自動註冊到SpringCloud的服務發現註冊框架,定時進行健康檢查。
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {
return new EurekaAutoServiceRegistration(context, registry, registration);
}
DiscoveryClient 原始碼
DiscoveryClient 原始碼
- 繼承關係
DiscoveryClient 實現了 EurekaClient,EurekaClient繼承了LookupService。
DiscoveryClient主要負責與eureka server互動,需要配置servers的url,支援故障轉移。eureka client 的主要作用有:
- 註冊實力到eureka server
- 向eureka server 續租約
- 在cleint關閉時,取消同eureka server的租約
- 查詢eureka server中的註冊資訊
EurekaClient定義一個簡單的介面,給DiscoveryClient實現,主要為了相容eureka 1.x版本,使得1.x版本更容易過渡到2.x版本,主要作用有:
- 提供各種不同的方式,以獲取各種InstanceInfos的能力
- 提供獲取客戶端資料的能力,如獲取regions等。
- 提供註冊和訪問健康檢查的能力
LookupService提供查詢所有活動的Instances的介面。
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
}
// 向登錄檔註冊所需的資訊,提供了各種註冊元件需要的實現,也可以自己自定義實現,如
// AbstractInstanceConfig提供了大量的預設資訊;
// MyDataCenterInstanceConfigProvider提供非aws的資料中心;
// CloudInstanceConfigProvider提供aws註冊所需的;
// EurekaInstanceConfig提供了向eureka註冊所需的;
// ApplicationInfoManager 使用的是EurekaInstanceConfig
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
// 客戶端配置 大部分資訊採用DefaultEurekaClientConfig
clientConfig = config;
// 已經過時,主要為了相容遺留客戶端問題
staticClientConfig = clientConfig;
// 傳輸層如http請求超時、重試等資訊
transportConfig = config.getTransportConfig();
// 該eureka例項的資訊 如主機資訊,健康檢測介面等
instanceInfo = myInfo;
if (myInfo != null) {
// 服務唯一地址 如:EUREKA-SERVER/172.16.17.60:eureka-server:30000
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
// 備份登錄檔資訊,當服務端不可用時,客戶度可以從這裡獲取登錄檔資訊
this.backupRegistryProvider = backupRegistryProvider;
// 如果eureka server的地址來源dns服務,則隨機獲取urls
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
// 採用cas Applications存放的時伺服器返回的儲存客戶端註冊資訊的
localRegionApps.set(new Applications());
// cas 遞增版本,防止客戶端註冊舊的資訊
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
// 判斷是否需要從eureka server 獲取登錄檔資訊 並初始化相應的度量資訊
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
// 是否需要將資訊註冊到eureka server上,通過這個開關可以實現,
// 只獲取其他例項的資訊,而不將自己的資訊給其他客戶端發現
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
// 屬於aws的基礎概念regin和zone 預設值為us-east-1
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
// 如果不需要註冊資訊到server和拉取註冊資訊表,則初始化完成。
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return; // no need to setup up an network tasks and we are done
}
try {
// 初始化排程執行緒池 3個核心執行緒 並且為後臺執行。主要負責:
// server 的url更新
// 排程TimedSuperVisorTask被該TimerTask所包裹的執行緒必須是執行緒安全的,負責在子任務超時時,強制子任務超時。
scheduler = Executors.newScheduledThreadPool(3,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
// 例項化心跳執行緒池,1個核心執行緒,預設最大的執行緒數為5個,使用直接提交執行緒
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 例項化登錄檔快取重新整理執行緒池,最大執行緒數預設5個
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
// 排程服務端端點
scheduleServerEndpointTask(eurekaTransport, args);
// 支援dns和配置 做region對映
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
// fetchRegistry方法,第一次獲取全部,之後是增量獲取,也可以通過true引數強制全量獲取,如果獲取成功則返回true,如果客戶端和服務端存在問題,則會返回false
// 如果需要拉取登錄檔,且獲取登錄檔失敗,則從本地備份中獲取註冊資訊,也就是ca的高可用的實現。
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// 初始化所有的排程任務 程式碼見下面
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
- initScheduledTasks 初始化排程任務原始碼
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
// 是否需要拉取登錄檔,重新整理本地快取登錄檔
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
// 預設30秒
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
// 超時後,再次呼叫的時間間隔基數,預設為10,具體演算法可以參考 TimedSupervisorTask的run方法。
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
/**
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
// 程式碼省略...
this.timeoutMillis = timeUnit.toMillis(timeout);
this.delay = new AtomicLong(timeoutMillis);
this.maxDelay = timeoutMillis * expBackOffBound;
// 程式碼省略...
}
public void run() {
Future future = null;
try {
// ...
delay.set(timeoutMillis);
// ...
} catch (TimeoutException e) {
logger.error("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
}
// ...
}
**/
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 是否傳送心跳資訊到server,即續約
if (clientConfig.shouldRegisterWithEureka()) {
// 每次發起續約時間 預設30秒
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
// 預設為10 同理
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// 更新和同步本地資訊到eureka server
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
// 預設30秒
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
// 執行速度受限burstSize
2); // burstSize
// 狀態改變監聽器,在下面配置是否需要使用該觸發器
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
// 觸發更新和同步本地訊息到erueka server
instanceInfoReplicator.onDemandUpdate();
}
};
// 是否註冊本地狀態改變觸發器,如果不註冊,則不會將本地狀態更新,同步到server
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 設定多少秒後,啟動instanceInfoReplicator,預設為40秒
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
- CloudEurekaClient 主要負責傳送心跳資訊,方法為onCacheRefreshed
@Override
protected void onCacheRefreshed() {
if (this.cacheRefreshedCount != null) { //might be called during construction and will be null
long newCount = this.cacheRefreshedCount.incrementAndGet();
log.trace("onCacheRefreshed called with count: " + newCount);
this.publisher.publishEvent(new HeartbeatEvent(this, newCount));
}
}
EurekaServer初始化 EurekaServerAutoConfiguration
EurekaServerConfigBean
SpringCloud關於EurekaServer的配置項
SpringCloud的InstanceRegistry與eureka的InstanceRegistry
SpringCloud通過EurekaServerAutoConfiguration適配,初始化eureka server的資訊,即InstanceRegistry
InstanceRegistry類圖(此類為SpringCloud定義的InstanceRegistry而非netflix的)
- LeaseManager 主要負責租約的管理,如建立、更新和刪除。
- AbstractInstanceRegistry 處理所有來自eureka client的註冊請求
- 提供註冊、續約、登出、過期處理、狀態改變處理
- 增量儲存登錄檔
- PeerAwareInstanceRegistry
- 處理Eureka Server節點間的同步
- 如果Eureka Server啟動的時候會從其他節點獲取登錄檔資訊,如果獲取失敗,Eureka Server會不允許使用者在指定的一段時間(EurekaServerConfig.getWaitTimeInMsWhenSyncEmpty)裡獲取登錄檔資訊。
- 自我保護實現
- 初始化Eureka Server
EurekaServerAutoConfiguration 原始碼
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
// 父類AbstractInstanceRegistry初始化一個新的空資訊的登錄檔
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
//每分鐘更新次數,預設為1
this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
//確認取消租約時的值,預設為1
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
// AbstractInstanceRegistry
/**
* Create a new, empty instance registry.
*/
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
this.serverConfig = serverConfig;
this.clientConfig = clientConfig;
this.serverCodecs = serverCodecs;
// 用於統計
this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);
// 記錄最近一次更新,每隔1分鐘更新一次
this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
// 清理過期增量資訊的排程器
this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
// 預設30秒
serverConfig.getDeltaRetentionTimerIntervalInMs(),
// 預設30秒
serverConfig.getDeltaRetentionTimerIntervalInMs());
}
// getDeltaRetentionTask
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
@Override
public void run() {
Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
// getRetentionTimeInMSInDeltaQueue 保持增量資訊的快取時間 預設為3分鐘
if (it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break;
}
}
}
};
}
// PeerAwareInstanceRegistryImpl
@Inject
public PeerAwareInstanceRegistryImpl(
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
EurekaClient eurekaClient
) {
super(serverConfig, clientConfig, serverCodecs);
this.eurekaClient = eurekaClient;
// 分片數量最近一次更新,每個1分鐘更新一次
this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
// We first check if the instance is STARTING or DOWN, then we check explicit overrides,
// then we check the status of a potentially existing lease.
// 先檢查例項是啟動或者關閉(DownOrStartingRule),然後再檢查優先順序(InstanceStatus),再檢查可能存在的租約的狀態(LeaseExistsRule)UP或者OUT_OF_SERVICE
// FirstMatchWinsCompositeRule 從狀態列表裡查詢第一個匹配的,如果狀態列表都沒有匹配的,則使用AlwaysMatchInstanceStatusRule返回預設的狀態UP
this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
}
EurekaServerContext
DefaultEurekaServerContext 繼承 EurekaServerContext
- 初始化DefaultEurekaServerContext
- 本地Eureka Server的上下文,以及暴露給其他元件訪問的服務,如註冊
- DefaultEurekaServerContext原始碼
@PostConstruct
@Override
public void initialize() throws Exception {
logger.info("Initializing ...");
// PeerEurekaNode 負責複製所有的節點更新操作。Server端節點更新的主要實現類
//啟動管理同等的eureka節點(PeerEurekaNode)的生命週期 排程器
peerEurekaNodes.start();
// 根據節點初始化InstanceRegistry,完成服務端的初始化
registry.init(peerEurekaNodes);
logger.info("Initialized");
}
// PeerEurekaNodes
public void start() {
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 先解析url,再更新節點資訊,先銷燬原來的,再更新
updatePeerEurekaNodes(resolvePeerUrls());
// 定義更新任務
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
// 排程更新任務
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
// 節點跟新間隔 預設 10分鐘
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: " + node.getServiceUrl());
}
}
//PeerEurekaNodes
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
// 獲取舊的urls,準備關閉
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
// 新增新的urls
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
// 如果發現沒改變,則直接返回
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
// Remove peers no long available
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
// 關閉,移除不再可用的urls
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// Add new peers
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
// 建立新的urls
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
// createPeerEurekaNode
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
// 建立Jersey 客戶端 傳送註冊、狀態更新、心跳等請求
HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
String targetHost = hostFromUrl(peerEurekaNodeUrl);
if (targetHost == null) {
targetHost = "host";
}
// 建立PeerEurekaNode
return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}
// PeerEurekaNode
PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
HttpReplicationClient replicationClient, EurekaServerConfig config,
int batchSize, long maxBatchingDelayMs,
long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
this.registry = registry;
this.targetHost = targetHost;
this.replicationClient = replicationClient;
this.serviceUrl = serviceUrl;
this.config = config;
this.maxProcessingDelayMs = config.getMaxTimeForReplication();
// 獲取批處理名稱,根據配置的url獲取hostName,如果hostName失敗,則直接採用配置的url做為批處理名稱
String batcherName = getBatcherName();
// 執行任務的客戶端需要實現的介面,提供了兩個介面,一個處理單個任務,一個處理多個任務,都會在同一個時間執行,多個任務的會聚合多個任務的返回結果,且返回的型別是一樣的。處理結果有Success, Congestion, TransientError(任務失敗,但過會會重試), PermanentError(任務失敗,且不再重試)
ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
// 建立批處理任務
this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
batcherName,
// 批處理任務最大的緩衝區大小,預設1萬個,如果超出,則會判斷哪些任務已經過期,過期則移除,新增新的任務
config.getMaxElementsInPeerReplicationPool(),
// 最大批處理個數,250
batchSize,
// 分配給副本的最大執行緒數 預設20
config.getMaxThreadsForPeerReplication(),
// 最大的批處理間隔 500毫秒
maxBatchingDelayMs,
// 服務不可用後,休息1秒
serverUnavailableSleepTimeMs,
// 網路異常後,每個100毫秒進行重試
retrySleepTimeMs,
// 處理任務
taskProcessor
);
this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
targetHost,
// 緩衝區 1萬
config.getMaxElementsInStatusReplicationPool(),
// 分配給副本處理狀態的最大執行緒數 預設1個
config.getMaxThreadsForStatusReplication(),
// 最大的批處理間隔 500毫秒
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
}
//PeerAwareInstanceRegistry
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
// 啟動更新記錄器
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
// 初始化註冊資訊快取,以供客戶端查詢使用,採用的是Guava的堆快取,具體實現類 ResponseCacheImpl
initializedResponseCache();
// 啟動檢查是否因為網路分割槽導致的更新急劇下降,從而防止服務被誤刪,即自我保護模式
scheduleRenewalThresholdUpdateTask();
// 初始化獲取其他Region區域的登錄檔資訊的排程器
initRemoteRegionRegistry();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
PeerEurekaNodes 與 PeerEurekaNode
PeerEurekaNodes類圖
PeerEurekaNode類圖
- PeerEurekaNodes 通過start方法開啟 管理 PeerEurekaNode的生命週期排程
- PeerEurekaNode 節點間互動的主要實現類,如心跳等
- 定義了當網路異常後,每隔100毫秒進行重試
- 如果伺服器不可用,進行1秒的休眠
- 定義最大的批處理排程間隔 500毫秒
- 定義了最大的批處理請求250
EurekaServerBootstrap
初始化完成EurekaServerContext後,接下來建立EurekaServerBootstrap(其同com.netflix.eureka.EurekaBootStrap的程式碼基本一致,netflix是通過監聽ServletContextListener事件來啟動eureka server),
再通過EurekaServerInitializerConfiguration啟動執行緒呼叫EurekaServerBootstrap的contextInitialized方法
@Override
public void start() {
new Thread(new Runnable() {
@Override
public void run() {
try {
//TODO: is this class even needed now?
// 呼叫contextInitialized Eureka Server進行初始化
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
// 釋出相關事件
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();
}
// EurekaServerBootstrap
public void contextInitialized(ServletContext context) {
try {
// 初始化環境配置資訊,eureka.datacenter和eureka.environment,如果沒配置第一個為default,第二個為test
initEurekaEnvironment();
// 初始化Eureka Server Context,呼叫PeerAwareInstanceRegistryImpl同步註冊資訊
initEurekaServerContext();
// 將EurekaServerContext設定到ServletContext中
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
至此,eureka server也就啟動完成了,接下來都是通過排程來實現互動。
總結
- EurekaClient的責任主要提現在DiscoveryClient類的實現;
EurekaServer的責任主要體現在PeerEurekaNodes與PeerEurekaNode類的實現;
其他相關類:
- 客戶端配置:EurekaClientConfigBean
- 服務端配置:EurekaServerConfigBean
- 服務註冊操作類:SpringCloud與netflix的InstanceRegistry
- EurekaServer啟動類:SpringCloud的EurekaServerBootstrap