1. 程式人生 > 實用技巧 >Eureka原始碼分析(四)

Eureka原始碼分析(四)

多級快取設計

Eureka Server存在三個變數:(registry、readWriteCacheMap、readOnlyCacheMap)儲存服務註冊資訊,預設情況下定時任務每30s將readWriteCacheMap同步至readOnlyCacheMap,每60s清理超過90s未續約的節點,Eureka Client每30s從readOnlyCacheMap更新服務註冊資訊,而客戶端服務的註冊則從registry更新服務註冊資訊。

多級快取的意義

這裡為什麼要設計多級快取呢?原因很簡單,就是當存在大規模的服務註冊和更新時,如果只是修改一個ConcurrentHashMap資料,那麼勢必因為鎖的存在導致競爭,影響效能。而Eureka又是AP模型,只需要滿足最終可用就行。所以它在這裡用到多級快取來實現讀寫分離。註冊方法寫的時候直接寫記憶體登錄檔,寫完表之後主動失效讀寫快取。獲取註冊資訊介面先從只讀快取取,只讀快取沒有再去讀寫快取取,讀寫快取沒有再去記憶體登錄檔裡取(不只是取,此處較複雜)。並且,讀寫快取會更新回寫只讀快取

  • responseCacheUpdateIntervalMs : readOnlyCacheMap 快取更新的定時器時間間隔,預設為30秒
  • responseCacheAutoExpirationInSeconds : readWriteCacheMap 快取過期時間,預設為 180 秒。

服務註冊的快取失效

在AbstractInstanceRegistry.register方法的最後,會呼叫invalidateCache(registrant.getAppName(), registrant.getVIPAddress(),registrant.getSecureVipAddress()); 方法,使得讀寫快取失效。

public void invalidate(Key... keys) {
    for (Key key : keys) {
        logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
                     key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());

        readWriteCacheMap.invalidate(key);
        Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
        if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
            for (Key keysWithRegion : keysWithRegions) {
                logger.debug("Invalidating the response cache key : {} {} {} {} {}",
                             key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
                readWriteCacheMap.invalidate(keysWithRegion);
            }
        }
    }
}

定時同步快取

ResponseCacheImpl的構造方法中,會啟動一個定時任務,這個任務會定時檢查寫快取中的資料變化,進行更新和同步。

private TimerTask getCacheUpdateTask() {
    return new TimerTask() {
        @Override
        public void run() {
            logger.debug("Updating the client cache from response cache");
            for (Key key : readOnlyCacheMap.keySet()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                                 key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                }
                try {
                    CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    if (cacheValue != currentCacheValue) {
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {
                    logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                } finally {
                    CurrentRequestVersion.remove();
                }
            }
        }
    };
}

服務續約

客戶端會在 initScheduledTasks 中,建立一個心跳檢測的定時任務

heartbeatTask = new TimedSupervisorTask(
    "heartbeat",
    scheduler,
    heartbeatExecutor,
    renewalIntervalInSecs,
    TimeUnit.SECONDS,
    expBackOffBound,
    new HeartbeatThread()
);
scheduler.schedule(
    heartbeatTask,
    renewalIntervalInSecs, TimeUnit.SECONDS);

HeartbeatThread

然後這個定時任務中,會執行一個 HearbeatThread 的執行緒,這個執行緒會定時呼叫renew()來做續約。

private class HeartbeatThread implements Runnable {

    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

服務端收到心跳請求的處理

在ApplicationResource.getInstanceInfo這個介面中,會返回一個InstanceResource的例項,在該例項下,定義了一個statusUpdate的介面來更新狀態

@Path("{id}")
public InstanceResource getInstanceInfo(@PathParam("id") String id) {
    return new InstanceResource(this, id, serverConfig, registry);
}

InstanceResource.statusUpdate()

在該方法中,我們重點關注 registry.statusUpdate 這個方法,它會呼叫AbstractInstanceRegistry.statusUpdate來更新指定服務提供者在服務端儲存的資訊中的變化。

@PUT
@Path("status")
public Response statusUpdate(
    @QueryParam("value") String newStatus,
    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
    @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    try {
        if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
            logger.warn("Instance not found: {}/{}", app.getName(), id);
            return Response.status(Status.NOT_FOUND).build();
        }
        boolean isSuccess = registry.statusUpdate(app.getName(), id,
                                                  InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
                                                  "true".equals(isReplication));

        if (isSuccess) {
            logger.info("Status updated: {} - {} - {}", app.getName(), id, newStatus);
            return Response.ok().build();
        } else {
            logger.warn("Unable to update status: {} - {} - {}", app.getName(), id, newStatus);
            return Response.serverError().build();
        }
    } catch (Throwable e) {
        logger.error("Error updating instance {} for status {}", id,
                     newStatus);
        return Response.serverError().build();
    }
}

AbstractInstanceRegistry.statusUpdate

在這個方法中,會拿到應用對應的例項列表,然後呼叫Lease.renew()去進行心跳續約。

public boolean statusUpdate(String appName, String id,
                            InstanceStatus newStatus, String lastDirtyTimestamp,
                            boolean isReplication) {
    try {
        read.lock();
        // 更新狀態的次數 狀態統計
        STATUS_UPDATE.increment(isReplication);
        // 從本地資料裡面獲取例項資訊,
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> lease = null;
        if (gMap != null) {
            lease = gMap.get(id);
        }
        // 例項不存在,則直接返回,表示失敗
        if (lease == null) {
            return false;
        } else {
            // 執行一下lease的renew方法,裡面主要是更新了這個instance的最後更新時間
            lease.renew();
            // 獲取instance例項資訊
            InstanceInfo info = lease.getHolder();
            // Lease is always created with its instance info object.
            // This log statement is provided as a safeguard, in case this invariant is violated.
            if (info == null) {
                logger.error("Found Lease without a holder for instance id {}", id);
            }
            // 當instance資訊不為空時,並且例項狀態發生了變化
            if ((info != null) && !(info.getStatus().equals(newStatus))) {
                // 如果新狀態是UP的狀態,那麼啟動一下serviceUp() , 主要是更新服務的註冊時間
                if (InstanceStatus.UP.equals(newStatus)) {
                    lease.serviceUp();
                }
                // 將instance Id 和這個狀態的對映資訊放入覆蓋快取MAP裡面去
                overriddenInstanceStatusMap.put(id, newStatus);
                //設定覆蓋狀態到例項資訊裡面去
                info.setOverriddenStatus(newStatus);
                long replicaDirtyTimestamp = 0;
                info.setStatusWithoutDirty(newStatus);
                if (lastDirtyTimestamp != null) {
                    replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
                }
                // If the replication's dirty timestamp is more than the existing one, just update
                // it to the replica's.
                //如果replicaDirtyTimestamp 的時間大於instance的getLastDirtyTimestamp() ,則更新
                if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
                    info.setLastDirtyTimestamp(replicaDirtyTimestamp);
                }
                info.setActionType(ActionType.MODIFIED);
                recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                info.setLastUpdatedTimestamp();
                //更新寫快取
                invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
            }
            return true;
        }
    } finally {
        read.unlock();
    }
}