SpringCloud原始碼閱讀1-EurekaServer原始碼的祕密
題外話:
Spring Cloud Netflix 作為springcloud 我們常用的一個專案,其子專案Eureka,zuul,Rebbion是我熟悉的。但是Spring Cloud Netflix 被宣佈進入了維護模式, 意思不再新增新特性了,這對於我們來說很不友好了。 大家紛紛尋找相應的替代工具。(具體可以網上搜索)
但這不影響我們學習一些元件的框架思想。我對註冊發現,負載均衡這塊比較感興趣。所以在此記錄下自己的閱讀心得。
版本說明:Finchley.SR1
1.元件的配置:
1.1 啟用Eureka註冊中心
當我們在springboot的啟動類上加上@EnableEurekaServer
@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaServerApplication.class,args);
}
}
複製程式碼
@EnableEurekaServer
僅僅是引入EurekaServerMarkerConfiguration
類。
Marker的英文意思是標記的意思,spring相關框架中有很多類似xxxMarkerxxx
xxxMarkerxxx
類就表示開啟,沒有表示關閉。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
複製程式碼
EurekaServerMarkerConfiguration
開關開啟的是哪個類呢??
org.springframework.cloud.netflix.eureka.server
EurekaServerAutoConfiguration
,此類在自動注入的過程中,會判斷開關是否開啟來決定是否自動注入相關類
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
複製程式碼
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
.....
}
複製程式碼
由此看出EurekaServerMarkerConfiguration
開關開啟的EurekaServerAutoConfiguration
。
1.2 元件的配置。
下面我們看看EurekaServerAutoConfiguration
配置了什麼東西。
(1.先看註解上相關配置
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
...
}
複製程式碼
- 引入
EurekaServerInitializerConfiguration
類,此類繼承了SmartLifecycle
介面,所以會在spring啟動完畢時回撥此類的start()方法 - EurekaDashboardProperties 表示Euerka面板相關配置屬性。例如:是否開啟面板;面板的訪問路徑
- InstanceRegistryProperties 表示例項註冊相關配置屬性。例如:每分鐘最大的續約數量,預設開啟的通訊數量 等
- 載入
/eureka/server.properties
的配置屬性。
(2.再看類內部相關配置(程式碼比較長,這裡只講內容,建議開啟原始碼看) 尋找類中的Bean
- HasFeatures 註冊HasFeatures表示Eureka特徵,
- EurekaServerConfigBean配置類,表示EurekaServer的配置資訊。通過
@ConfigurationProperties(“eureka.server”)
對映我們的配置檔案中的eureka.server.xxxx
格式的配置資訊(此類很重要啊,我們想修改EurekaServer的配置資訊,可以配置eureka.server.xxxx
覆蓋此類中的預設配置) - EurekaController: 面板的訪問配置預設是“/”
- 註冊編碼器
(ServerCodecs)CloudServerCodecs
- PeerAwareInstanceRegistry:對等節點同步器。 多個節點下複製相關。 與註冊中心高可用有關的元件。此處註冊的是
InstanceRegistry
(注意PeerAwareInstanceRegistry實現了AbstractInstanceRegistry,這裡準確的說是 對等節點+當前節點同步器) - PeerEurekaNodes: Eureka-Server 叢集節點的集合。儲存了叢集下各個節點資訊。也是與高可用有關。
- EurekaServerContext : 上下文。預設註冊的
DefaultEurekaServerContext
- EurekaServerBootstrap: EurekaServer啟動器。EurekaServerBootstrap
- FilterRegistrationBean: 註冊 Jersey filter過濾器。這裡有必要講一下。Eureka也是servlet應用。不過他是通過Jersey 框架來提供介面的。Jersey 框架是一個類Springmvc的web框架。我們專案中大多都是使用springmvc來處理。所以註冊 Jersey filter過濾器,把
/eureka
開頭的請求都交給Jersey 框架去解析。容器是com.sun.jersey.spi.container.servlet.ServletContainer
- ApplicationResource: 暴漏
com.netflix.discovery","com.netflix.eureka"
包路徑下的介面。通常我們再springmvc中通過Controller概念來表示介面,Jersey框架下用ApplicationResource的概念來表示介面。暴露的介面其實就是eureka各個應用通訊的介面。(下面再說這些介面)
EurekaServerAutoConfiguration
基本上就做了這些工作。我們來歸類總結下
針對當前Eureka例項的相關元件:
- EurekaDashboardProperties:面板屬性
- EurekaController: 面板的訪問的處理器。
- InstanceRegistryProperties:例項註冊相關屬性
- (EurekaServerConfig)EurekaServerConfigBean:當前ErekekaServer相關配置
- EurekaServerContext : 當前Eureka 註冊中心上下文
- 請求相關元件:註冊
/eureka
路徑的相關介面,註冊攔截/eureka
的攔截器,註冊com.sun.jersey.spi.container.servlet.ServletContainer
容器來處理對應的請求
兩個針對叢集下相關元件:
- PeerAwareInstanceRegistry:用於叢集下的節點相關複製資訊用
- PeerEurekaNodes:叢集下的所有節點資訊
兩個針對啟動相關類:
- EurekaServerInitializerConfiguration: 對接spring,再spring啟動完成後,呼叫
- EurekaServerBootstrap:啟動器,用於啟動當前Eureak例項的上下文
至此:我們也可以大致瞭解了一個EurekaServer大致長什麼樣子了。
2.EurekaServerContext初始化:
EurekaServerContext作為上下文,應該是核心所在。上文講過註冊DefaultEurekaServerContext
。此類中有@Inject,@PostConstruct,@PreDestroy
註解的方法,重點來看看。
@Inject
public DefaultEurekaServerContext(EurekaServerConfig serverConfig,ServerCodecs serverCodecs,PeerAwareInstanceRegistry registry,PeerEurekaNodes peerEurekaNodes,ApplicationInfoManager applicationInfoManager) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.registry = registry;
this.peerEurekaNodes = peerEurekaNodes;
this.applicationInfoManager = applicationInfoManager;
}
複製程式碼
2.1 @Inject註解的構造方法
@Inject
註解的方法,引數由IOC容器注入。serverConfig,serverCodecs,registry,peerEurekaNodes
我們已經認識了。ApplicationInfoManager 是用來管理應用資訊的,也就是例項註冊資訊,由ApplicationInfoManager統一管理。
2.2 @PostConstruct註解的initialize()方法
@PostConstruct
修飾的方法會在伺服器載入Servle的時候執行,並且只會被伺服器執行一次,被@PostConstruct
修飾的方法會在建構函式之後,init()方法之前執行.
@PostConstruct
@Override
public void initialize() {
logger.info("Initializing ...");
peerEurekaNodes.start();
try {
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
複製程式碼
這個方法很簡明,主要有兩個重要的的點:
- peerEurekaNodes.start();
- registry.init(peerEurekaNodes);
2.2.1 peerEurekaNodes.start()
PeerEurekaNodes: 用於管理PeerEurekaNode節點集合。 peerEurekaNodes.start();
public void start() {
//建立一個單執行緒定時任務執行緒池:執行緒的名稱叫做Eureka-PeerNodesUpdater
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,"Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 解析Eureka Server URL,並更新PeerEurekaNodes列表
updatePeerEurekaNodes(resolvePeerUrls());
//建立任務
//任務內容為:解析Eureka Server URL,並更新PeerEurekaNodes列表
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes",e);
}
}
};
//交給執行緒池執行,執行間隔10min
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,serverConfig.getPeerEurekaNodesUpdateIntervalMs(),TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}",node.getServiceUrl());
}
}
複製程式碼
resolvePeerUrls():
解析配置的對等體URL。就是在配置檔案中配置的多個Eureka註冊中心的URL.
updatePeerEurekaNodes:
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
//計算需要移除的url= 原來-新配置。
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
//計算需要增加的url= 新配置-原來的。
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
//沒有變化就不更新
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
// 刪除需要移除url對應的節點。
if (!toShutdown.isEmpty()) {
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// 新增需要增加的url對應的節點
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}",toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
//更新節點列表
this.peerEurekaNodes = newNodeList;
//更新節點url列表
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
複製程式碼
總結:start()方法,其實就是完成新配置的eureka叢集資訊的初始化更新工作。
2.2.2 registry.init(peerEurekaNodes)
對等節點同步器的初始化。
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
//統計最近X秒內的來自對等節點複製的續約數量(預設1秒)
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
//初始化返回結果快取
initializedResponseCache();
//更新續約閥值
scheduleRenewalThresholdUpdateTask();
//初始化遠端區域註冊 相關資訊
initRemoteRegionRegistry();
...
}
複製程式碼
numberOfReplicationsLastMin.start():
啟動一個定時任務,任務名稱為Eureka-MeasureRateTimer
,每1秒統計從對等節點複製的續約數,將當前的桶的統計資料放到lastBucket,當前桶置為0
this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
--
this.timer = new Timer("Eureka-MeasureRateTimer",true);
---
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
// Zero out the current bucket.
lastBucket.set(currentBucket.getAndSet(0));
} catch (Throwable e) {
logger.error("Cannot reset the Measured Rate",e);
}
}
},sampleInterval,sampleInterval);
複製程式碼
注意:此統計器用於節點之間複製的統計。
initializedResponseCache():
精闢,快取來實現返回結果的快取,優秀設計啊。
使用goole cache初始化一個快取類ResponseCacheImpl
,快取(all applications,delta changes and for individual applications
)請求的結果,
此類中有兩個快取:
-
readWriteCacheMap
: 讀寫快取。初始化容量1000,失效時間3分鐘。 -
readOnlyCacheMap
:只讀快取,shouldUseReadOnlyResponseCache
屬性控制是否啟用,預設是啟用的。此快取會使用,名為Eureka-CacheFillTimer
的timer,每30s更新從readWriteCacheMap
中更新readOnlyCacheMap
中的快取值。
取值邏輯:
先從readOnlyCacheMap
取值,沒有去readWriteCacheMap
,沒有去通過CacheLoader載入,而CacheLoader會到維護應用例項註冊資訊的Map中獲取。
這裡就產生了一個疑問,為啥有搞個二級快取來快取結果呢?不是很理解。
scheduleRenewalThresholdUpdateTask()
使用名為ReplicaAwareInstanceRegistry - RenewalThresholdUpdater
的timer,每15(900s)分鐘執行updateRenewalThreshold()
任務,更新續約閥值。
private void updateRenewalThreshold() {
try {
Applications apps = eurekaClient.getApplications();
int count = 0;
//統計所有註冊instance個數
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
synchronized (lock) {
當總數》預期值時 或者 關閉了自我保護模式,更新
if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfRenewsPerMin)
|| (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
}
}
logger.info("Current renewal threshold is : {}",numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold",e);
}
}
複製程式碼
-
expectedNumberOfRenewsPerMin
每分鐘最大的續約數量 (30s/次,2次/s): =客戶端數量count*2 -
numberOfRenewsPerMinThreshold
每分鐘續約閾值。serverConfig.getRenewalPercentThreshold()*expectedNumberOfRenewsPerMin
serverConfig.getRenewalPercentThreshold()預設是0.85
當每分鐘續約數小於numberOfRenewsPerMinThreshold
閾值時,並且自我保護沒有關閉的情況下,開啟自我保護,此期間不剔除任何一個客戶端。(下面的EvictionTask()
驅逐任務會講到如何利用)
- InstanceRegistry初始化
- 客戶端cancle主動下線
- 客戶端註冊
- scheduleRenewalThresholdUpdateTask
此四個地方都會更新兩個值
initRemoteRegionRegistry()
初始化 遠端區域註冊 相關資訊
2.3 @PreDestroy註解的initialize方法
@PreDestroy
修飾的方法會在伺服器解除安裝Servlet的時候執行,並且只會被伺服器執行一次,被@PreDestroy
修飾的方法會Destroy
方法之後執行,在Servlet被徹底解除安裝之前.
public void shutdown() {
registry.shutdown();
peerEurekaNodes.shutdown();
}
複製程式碼
registry.shutdown();
停掉init()時啟動的定時任務
peerEurekaNodes.shutdown()
清空叢集url快取,叢集節點快取。
2.4 小結
總結:EurekaServerContext
的初始化做了很多事情,很精闢,建議多閱讀,多學習
3.EurekaServer啟動:
EurekaServerInitializerConfiguration
實現了SmartLifecycle
介面,在spring啟動後,執行start()方法
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
//釋出註冊中心可以註冊事件
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
//狀態為執行狀態
EurekaServerInitializerConfiguration.this.running = true;
//釋出註冊中心啟動完成事件
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
複製程式碼
這裡重點看先EurekaServerBootstrap.contextInitialized
EurekaServerBootstrap
的contextInitialized主要乾了兩件事
initEurekaEnvironment();初始化環境
initEurekaServerContext();初始化上下文
複製程式碼
3.1 initEurekaEnvironment
主要是資料中心等環境變數的初始化
3.2 initEurekaServerContext
此方法中最重要的是
從相鄰eureka節點拷貝註冊列表資訊
int registryCount = this.registry.syncUp();
允許開始與客戶端的資料傳輸,即開始作為Server服務
this.registry.openForTraffic(this.applicationInfoManager,registryCount);
複製程式碼
3.1.1 registry.syncUp()
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
//重試次數
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
//從eurekaClient獲取服務列表
Applications apps = eurekaClient.getApplications();
//遍歷註冊
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
register(instance,instance.getLeaseInfo().getDurationInSecs(),true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy",t);
}
}
}
}
return count;
}
複製程式碼
3.1.2 registry.openForTraffic
允許開始與客戶端的資料傳輸,即開始作為Server服務
InstanceRegistry.openForTraffic
public void openForTraffic(ApplicationInfoManager applicationInfoManager,int count) {
super.openForTraffic(applicationInfoManager,count == 0 ? this.defaultOpenForTrafficCount : count);
}
PeerAwareInstanceRegistryImpl.openForTraffic
public void openForTraffic(ApplicationInfoManager applicationInfoManager,int count) {
//每分鐘期待的續約數(預設30s續約,60s就是2次)
this.expectedNumberOfRenewsPerMin = count * 2;
// 每分鐘續約的閥值:0.85 * expectedNumberOfRenewsPerMin
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
....
logger.info("Changing status to UP");
//applicationInfoManager設定狀態為UP
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
複製程式碼
3.1.3 EvictionTask() 驅逐任務
protected void postInit() {
//又啟動了一個續約數統計器,此統計器用於配合驅逐任務
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());
}
複製程式碼
建立一個名為Eureka-EvictionTimer
的定時器來執行EvictionTask()任務。
EvictionTask()任務:
@Override
public void run() {
// 獲取延遲秒數,就是延遲幾秒下線時間。
long compensationTimeMs = getCompensationTimeMs();
//驅逐操作
evict(compensationTimeMs);
}
複製程式碼
evict()驅逐操作:清理過期租約
public void evict(long additionalLeaseMs) {
// 判斷是否開啟自我保護,自我保護期間不剔除任何任務
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
//迴圈獲得 所有過期的租約
for (Entry<String,Map<String,Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String,Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String,Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
// 判斷是否過期
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// 計算 最大允許清理租約數量
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
// 計算 清理租約數量
int toEvict = Math.min(expiredLeases.size(),evictionLimit);
if (toEvict > 0) {
Random random = new Random(System.currentTimeMillis());
// 遍歷清理。
for (int i = 0; i < toEvict; i++) {
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases,i,next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
internalCancel(appName,id,false);
}
}
}
複製程式碼
isLeaseExpirationEnabled():判斷是否開啟自我保護的兩個條件
- 自我保護配置處於開啟狀態
- 當前單位續約數(renewsLastMin統計器統計的資料)<閾值
Lease.isExpire():是否過期的判斷:
public boolean isExpired(long additionalLeaseMs) {
return (
//或者明確例項下線時間。
evictionTimestamp > 0
//或者距離最後更新時間已經過去至少3分鐘
|| System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
複製程式碼
- evictionTimestamp : 例項下線時間,當客戶端下線時,會更新這個時間
- duration : 過期間隔,預設為90秒
- lastUpdateTimestamp : 為最後更新時間
//續約時更新lastUpdateTimestamp,加上了過期間隔?
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
複製程式碼
過期時間判斷: System.currentTimeMillis()> lastUpdateTimestamp + duration + additionalLeaseMs 這裡加了兩次duration, 也就是180秒,加上延遲下線時間。也就是最少需要3分鐘才判斷下線。
3.3 小結
至此Eureka server的初始化就完成了。 這裡通過debug模式來看看初始化過程中的定時任務。
4.API介面
Eureka Server 啟動後,就是對外提供服務了。等待客戶端來註冊。
Eureka是一個基於REST(Representational State Transfer)服務,我們從官方檔案中可以看到其對外提供的介面: 官方檔案
可以推測,客戶端註冊時也是呼叫了這些介面來進行與服務端的通訊的。上文說過,Eureka 使用jersey框架來做MVC框架,暴露介面。ApplicationResource
類似springmvc中的Controller。
在com.netflix.eureka.resources
包下我們可以看到這些ApplicationResource
4.1註冊介面
ApplicationResource.addInstance
對應的就是服務註冊介面
@POST
@Consumes({"application/json","application/xml"})
public Response addInstance(InstanceInfo info,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
....
//使用PeerAwareInstanceRegistryImpl#register() 註冊例項資訊。
registry.register(info,"true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
複製程式碼
InstanceRegistry
@Override
public void register(final InstanceInfo info,final boolean isReplication) {
//釋出註冊事件,
handleRegistration(info,resolveInstanceLeaseDuration(info),isReplication);
super.register(info,isReplication);
}
PeerAwareInstanceRegistryImpl
@Override
public void register(final InstanceInfo info,final boolean isReplication) {
//租期90s
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//註冊例項
super.register(info,leaseDuration,isReplication);
//複製到其他節點。
replicateToPeers(Action.Register,info.getAppName(),info.getId(),info,null,isReplication);
}
複製程式碼
4.1.1註冊到當前Eureka
AbstractInstanceRegistry
public void register(InstanceInfo registrant,int leaseDuration,boolean isReplication) {
read.lock()讀鎖
1.從快取中獲取例項名稱對應的租約資訊
Map<String,Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
2.統計數+1
REGISTER.increment(isReplication);
//gmap為null.則建立一個Map。
3.租約的處理分兩種情況:
租約已經存在:
比較新租約與舊租約的LastDirtyTimestamp,使用LastDirtyTimestamp最晚的租約
租約不存在,即新註冊:
synchronized (lock) {
更新期待每分鐘續約數
更新續約閾值
}
將租約放入appname對應的map中。
4.在最近註冊隊(recentRegisteredQueue)裡新增一個當前註冊資訊
5.狀態的處理:
將當前例項的OverriddenStatus狀態,放到Eureka Server的overriddenInstanceStatusMap;
根據OverriddenStatus狀態,設定狀態
7.例項actionType=ADDED
registrant.setActionType(ActionType.ADDED);
8. 維護recentlyChangedQueue,儲存最近操作
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
9.更新最後更新時間
registrant.setLastUpdatedTimestamp();
10.使當前例項的結果快取ResponseCache失效()
invalidateCache(registrant.getAppName(),registrant.getVIPAddress(),registrant.getSecureVipAddress());
}
複製程式碼
4.1.2複製到其他節點
此處可以看原始碼閱讀,在此不講了
4.2查詢介面
我們獲取的例項資訊,其實都是從快取中獲取的String payLoad = responseCache.get(cacheKey);
@GET
public Response getApplication(@PathParam("version") String version,@HeaderParam("Accept") final String acceptHeader,@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {
if (!registry.shouldAllowAccess(false)) {
return Response.status(Status.FORBIDDEN).build();
}
EurekaMonitors.GET_APPLICATION.increment();
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
if (acceptHeader == null || !acceptHeader.contains("json")) {
keyType = Key.KeyType.XML;
}
Key cacheKey = new Key(
Key.EntityType.Application,appName,keyType,CurrentRequestVersion.get(),EurekaAccept.fromString(eurekaAccept)
);
String payLoad = responseCache.get(cacheKey);
if (payLoad != null) {
logger.debug("Found: {}",appName);
return Response.ok(payLoad).build();
} else {
logger.debug("Not Found: {}",appName);
return Response.status(Status.NOT_FOUND).build();
}
}
複製程式碼
總結
由於篇幅限制:
- Renew: 服務續約
- Cancel: 服務下線 不說了。
至此:Eureka服務端內容大體講完,只講了些大概,具體建議跟原始碼。
如有錯誤,敬請指出