Spring Cloud Eureka原始碼分析---服務註冊
本篇我們著重分析Eureka服務端的邏輯實現,主要涉及到服務的註冊流程分析。
在Eureka的服務治理中,會涉及到下面一些概念:
服務註冊:Eureka Client會通過傳送REST請求的方式向Eureka Server註冊自己的服務,提供自身的元資料,比如 IP 地址、埠、執行狀況指標的URL、主頁地址等資訊。Eureka Server接收到註冊請求後,就會把這些元資料資訊儲存在一個ConcurrentHashMap中。
服務續約:在服務註冊後,Eureka Client會維護一個心跳來持續通知Eureka Server,說明服務一直處於可用狀態,防止被剔除。Eureka Client在預設的情況下會每隔30秒傳送一次心跳來進行服務續約。
服務同步:Eureka Server之間會互相進行註冊,構建Eureka Server叢集,不同Eureka Server之間會進行服務同步,用來保證服務資訊的一致性。
獲取服務:服務消費者(Eureka Client)在啟動的時候,會發送一個REST請求給Eureka Server,獲取上面註冊的服務清單,並且快取在Eureka Client本地,預設快取30秒。同時,為了效能考慮,Eureka Server也會維護一份只讀的服務清單快取,該快取每隔30秒更新一次。
服務呼叫:服務消費者在獲取到服務清單後,就可以根據清單中的服務列表資訊,查詢到其他服務的地址,從而進行遠端呼叫。Eureka有Region和Zone的概念,一個Region可以包含多個Zone,在進行服務呼叫時,優先訪問處於同一個Zone中的服務提供者。
服務下線:當Eureka Client需要關閉或重啟時,就不希望在這個時間段內再有請求進來,所以,就需要提前先發送REST請求給Eureka Server,告訴Eureka Server自己要下線了,Eureka Server在收到請求後,就會把該服務狀態置為下線(DOWN),並把該下線事件傳播出去。
服務剔除:有時候,服務例項可能會因為網路故障等原因導致不能提供服務,而此時該例項也沒有傳送請求給Eureka Server來進行服務下線,所以,還需要有服務剔除的機制。Eureka Server在啟動的時候會建立一個定時任務,每隔一段時間(預設60秒),從當前服務清單中把超時沒有續約(預設90秒)的服務剔除。
自我保護:既然Eureka Server會定時剔除超時沒有續約的服務,那就有可能出現一種場景,網路一段時間內發生了異常,所有的服務都沒能夠進行續約,Eureka Server就把所有的服務都剔除了,這樣顯然不太合理。所以,就有了自我保護機制,當短時間內,統計續約失敗的比例,如果達到一定閾值,則會觸發自我保護的機制,在該機制下,Eureka Server不會剔除任何的微服務,等到正常後,再退出自我保護機制。
1. 基本原理:
- Eureka Server 提供服務註冊服務,各個節點啟動後,會在Eureka Server中進行註冊,這樣Eureka Server中的服務登錄檔中將會儲存所有可用服務節點的資訊,服務節點的資訊可以在介面中直觀的看到;
- Eureka Client 是一個Java 客戶端,用於簡化與Eureka Server的互動,客戶端同時也具備一個內建的、使用輪詢負載演算法的負載均衡器;
- 在應用啟動後,將會向Eureka Server傳送心跳(預設週期為30秒),如果Eureka Server在多個心跳週期沒有收到某個節點的心跳,Eureka Server 將會從服務登錄檔中把這個服務節點移除(預設90秒);
- Eureka Server之間將會通過複製的方式完成資料的同步;
- Eureka Client具有快取的機制,即使所有的Eureka Server 都掛掉的話,客戶端依然可以利用快取中的資訊消費其它服務的API;
上一篇中我們搭建了一個簡單的Eureka客戶端和服務端。如果你有啟動過觀看啟動日誌不難發現:
這裡有個EurekaServerBootstrap
類,啟動日誌中給出:Setting the eureka configuration..,Initialized server context
。看起來這個應該是個啟動類,跟進去看一下,有個很顯眼的方法:
這個方法的呼叫先按住不表,我們先從啟動類上新增的 EnableEurekaServer
註解著手,看看為什麼添加了一個註解就能啟用 Rureka。
從server啟動類上的EnableEurekaServer
註解進入:
接下來引用了
EurekaServerMarkerConfiguration
,看到在這個註解上有個註釋:啟用這個註解的目的是為了啟用:EurekaServerAutoConfiguration類;進入EurekaServerAutoConfiguration看到在類頭部有一個註解:
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
即
EurekaServerAutoConfiguration
啟動的條件是EurekaServerMarkerConfiguration
註解先載入。
上面這一張圖示識出了從啟動註解到預啟動類的流程,但是你會發現實際上 EurekaServerAutoConfiguration 也沒有做什麼事情:配置初始化,啟動一些基本的過濾器。同樣在類頭部的引用上有一個Import註解:
@Import(EurekaServerInitializerConfiguration.class)
所以在 EurekaServerAutoConfiguration 初始化的時候,會引用到 EurekaServerInitializerConfiguration,啟用它的初始化。EurekaServerInitializerConfiguration 實現了SmartLifecycle.start方法,在spring 初始化的時候會被啟動,啟用 run 方法。可以看到在 run 方法中呼叫的就是:
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
即我們上面截圖中的EurekaServerBootstrap.contextInitialized()
方法。
整體的呼叫流程如下:
具體的初始化資訊見下圖:
2. 服務註冊實現
2.1 server端啟動時同步別的server上的client
在上面講到Eureka server啟動過程中,啟動一個Eureka Client的時候,initEurekaServerContext()
裡面會進行服務同步和服務剔除,syncUp()方法所屬的類是:PeerAwareInstanceRegistry,即server端的服務註冊邏輯都在這裡面。因為沒有使用AWS的伺服器,所以預設例項化的實現類為:PeerAwareInstanceRegistryImpl。
PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
registry = new AwsInstanceRegistry(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
awsBinder.start();
} else {
registry = new PeerAwareInstanceRegistryImpl(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
}
PeerAwareInstanceRegistryImpl 繼承了一個抽象類 AbstractInstanceRegistry:
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
}
AbstractInstanceRegistry中的實現邏輯是真正的服務註冊儲存所在地:
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
private static final Logger logger = LoggerFactory.getLogger(AbstractInstanceRegistry.class);
private static final String[] EMPTY_STR_ARRAY = new String[0];
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
protected Map<String, RemoteRegionRegistry> regionNameVSRemoteRegistry = new HashMap<String, RemoteRegionRegistry>();
protected final ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap = CacheBuilder
.newBuilder().initialCapacity(500)
.expireAfterAccess(1, TimeUnit.HOURS)
.<String, InstanceStatus>build().asMap();
....
....
....
}
所有的服務例項資訊都儲存在 server 本地的map當中。所以在server端啟動的時候會去拉別的server上儲存的client例項,然後儲存到本地快取。
2.2 client主動註冊
如果是某個client主動發出了註冊請求,那麼是如何註冊到服務端呢?
還是檢視日誌:啟動服務端,然後再啟動客戶端,檢視服務端日誌:
這裡能看到剛才啟動的客戶端已經在服務端註冊了,註冊邏輯走的類是:AbstractInstanceRegistry。
上面也提到 是服務註冊的邏輯實現類,完成儲存客戶端資訊的方法是:
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
......
}
程式碼就不貼了,主要實現的邏輯是儲存當前註冊的客戶端資訊。我們知道客戶端是傳送了一次http請求給服務端,那麼真正的註冊邏輯應該是從一個http請求的接收處進來的。跟著使用了register方法的地方去找,PeerAwareInstanceRegistryImpl裡面有呼叫:
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
//將新節點資訊告訴別的服務端
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
這裡沒有改寫父類的register邏輯,下面還多了一句:replicateToPeers,這裡主要做的邏輯是:給兄弟 server節點發送register 請求,告訴他們有客戶端來註冊。
繼續看誰呼叫了這裡,可以找到:ApplicationResource 的addInstance方法呼叫了:
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
......
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
而這裡很顯然是一個介面,邏輯就很清晰了:
另外,我們檢視addInstance方法被誰呼叫的過程中發現:PeerReplicationResource--->batchReplication 方法也呼叫了註冊的邏輯。
這個方法一看竟然解答了之前我的疑惑:服務端之間是如何傳送心跳的。原來實現是在這裡。通過dispatch方法來區分當前的呼叫是何種請求,
可以看到,服務註冊,心跳檢測,服務取消,服務下線,服務剔除的入口都在這裡:
@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
try {
ReplicationListResponse batchResponse = new ReplicationListResponse();
for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
try {
batchResponse.addResponse(dispatch(instanceInfo));
} catch (Exception e) {
batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
logger.error("{} request processing failed for batch item {}/{}",
instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e);
}
}
return Response.ok(batchResponse).build();
} catch (Throwable e) {
logger.error("Cannot execute batch Request", e);
return Response.status(Status.INTERNAL_SERVER_ERROR).build();
}
}
private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
ApplicationResource applicationResource = createApplicationResource(instanceInfo);
InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
String instanceStatus = toString(instanceInfo.getStatus());
Builder singleResponseBuilder = new Builder();
switch (instanceInfo.getAction()) {
case Register:
singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
break;
case Heartbeat:
singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
break;
case Cancel:
singleResponseBuilder = handleCancel(resource);
break;
case StatusUpdate:
singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
break;
case DeleteStatusOverride:
singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
break;
}
return singleResponseBuilder.build();
}
從這個入口進去,大家可以跟蹤一下感興趣的邏輯