Eureka原始碼分析(四)
多級快取設計
Eureka Server存在三個變數:(registry、readWriteCacheMap、readOnlyCacheMap)儲存服務註冊資訊,預設情況下定時任務每30s將readWriteCacheMap同步至readOnlyCacheMap,每60s清理超過90s未續約的節點,Eureka Client每30s從readOnlyCacheMap更新服務註冊資訊,而客戶端服務的註冊則從registry更新服務註冊資訊。
多級快取的意義
這裡為什麼要設計多級快取呢?原因很簡單,就是當存在大規模的服務註冊和更新時,如果只是修改一個ConcurrentHashMap資料,那麼勢必因為鎖的存在導致競爭,影響效能。而Eureka又是AP模型,只需要滿足最終可用就行。所以它在這裡用到多級快取來實現讀寫分離。註冊方法寫的時候直接寫記憶體登錄檔,寫完表之後主動失效讀寫快取。獲取註冊資訊介面先從只讀快取取,只讀快取沒有再去讀寫快取取,讀寫快取沒有再去記憶體登錄檔裡取(不只是取,此處較複雜)。並且,讀寫快取會更新回寫只讀快取
- responseCacheUpdateIntervalMs : readOnlyCacheMap 快取更新的定時器時間間隔,預設為30秒
- responseCacheAutoExpirationInSeconds : readWriteCacheMap 快取過期時間,預設為 180 秒。
服務註冊的快取失效
在AbstractInstanceRegistry.register方法的最後,會呼叫invalidateCache(registrant.getAppName(), registrant.getVIPAddress(),registrant.getSecureVipAddress()); 方法,使得讀寫快取失效。
public void invalidate(Key... keys) { for (Key key : keys) { logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(key); Collection<Key> keysWithRegions = regionSpecificKeys.get(key); if (null != keysWithRegions && !keysWithRegions.isEmpty()) { for (Key keysWithRegion : keysWithRegions) { logger.debug("Invalidating the response cache key : {} {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(keysWithRegion); } } } }
定時同步快取
ResponseCacheImpl的構造方法中,會啟動一個定時任務,這個任務會定時檢查寫快取中的資料變化,進行更新和同步。
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
} finally {
CurrentRequestVersion.remove();
}
}
}
};
}
服務續約
客戶端會在 initScheduledTasks 中,建立一個心跳檢測的定時任務
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
HeartbeatThread
然後這個定時任務中,會執行一個 HearbeatThread 的執行緒,這個執行緒會定時呼叫renew()來做續約。
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
服務端收到心跳請求的處理
在ApplicationResource.getInstanceInfo這個介面中,會返回一個InstanceResource的例項,在該例項下,定義了一個statusUpdate的介面來更新狀態
@Path("{id}")
public InstanceResource getInstanceInfo(@PathParam("id") String id) {
return new InstanceResource(this, id, serverConfig, registry);
}
InstanceResource.statusUpdate()
在該方法中,我們重點關注 registry.statusUpdate 這個方法,它會呼叫AbstractInstanceRegistry.statusUpdate來更新指定服務提供者在服務端儲存的資訊中的變化。
@PUT
@Path("status")
public Response statusUpdate(
@QueryParam("value") String newStatus,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
try {
if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
logger.warn("Instance not found: {}/{}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
boolean isSuccess = registry.statusUpdate(app.getName(), id,
InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
"true".equals(isReplication));
if (isSuccess) {
logger.info("Status updated: {} - {} - {}", app.getName(), id, newStatus);
return Response.ok().build();
} else {
logger.warn("Unable to update status: {} - {} - {}", app.getName(), id, newStatus);
return Response.serverError().build();
}
} catch (Throwable e) {
logger.error("Error updating instance {} for status {}", id,
newStatus);
return Response.serverError().build();
}
}
AbstractInstanceRegistry.statusUpdate
在這個方法中,會拿到應用對應的例項列表,然後呼叫Lease.renew()去進行心跳續約。
public boolean statusUpdate(String appName, String id,
InstanceStatus newStatus, String lastDirtyTimestamp,
boolean isReplication) {
try {
read.lock();
// 更新狀態的次數 狀態統計
STATUS_UPDATE.increment(isReplication);
// 從本地資料裡面獲取例項資訊,
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> lease = null;
if (gMap != null) {
lease = gMap.get(id);
}
// 例項不存在,則直接返回,表示失敗
if (lease == null) {
return false;
} else {
// 執行一下lease的renew方法,裡面主要是更新了這個instance的最後更新時間
lease.renew();
// 獲取instance例項資訊
InstanceInfo info = lease.getHolder();
// Lease is always created with its instance info object.
// This log statement is provided as a safeguard, in case this invariant is violated.
if (info == null) {
logger.error("Found Lease without a holder for instance id {}", id);
}
// 當instance資訊不為空時,並且例項狀態發生了變化
if ((info != null) && !(info.getStatus().equals(newStatus))) {
// 如果新狀態是UP的狀態,那麼啟動一下serviceUp() , 主要是更新服務的註冊時間
if (InstanceStatus.UP.equals(newStatus)) {
lease.serviceUp();
}
// 將instance Id 和這個狀態的對映資訊放入覆蓋快取MAP裡面去
overriddenInstanceStatusMap.put(id, newStatus);
//設定覆蓋狀態到例項資訊裡面去
info.setOverriddenStatus(newStatus);
long replicaDirtyTimestamp = 0;
info.setStatusWithoutDirty(newStatus);
if (lastDirtyTimestamp != null) {
replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
}
// If the replication's dirty timestamp is more than the existing one, just update
// it to the replica's.
//如果replicaDirtyTimestamp 的時間大於instance的getLastDirtyTimestamp() ,則更新
if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
info.setLastDirtyTimestamp(replicaDirtyTimestamp);
}
info.setActionType(ActionType.MODIFIED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
info.setLastUpdatedTimestamp();
//更新寫快取
invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
}
return true;
}
} finally {
read.unlock();
}
}