1. 程式人生 > >Spring Cloud Eureka原始碼分析---服務註冊

Spring Cloud Eureka原始碼分析---服務註冊

本篇我們著重分析Eureka服務端的邏輯實現,主要涉及到服務的註冊流程分析。

在Eureka的服務治理中,會涉及到下面一些概念:

服務註冊:Eureka Client會通過傳送REST請求的方式向Eureka Server註冊自己的服務,提供自身的元資料,比如 IP 地址、埠、執行狀況指標的URL、主頁地址等資訊。Eureka Server接收到註冊請求後,就會把這些元資料資訊儲存在一個ConcurrentHashMap中。

服務續約:在服務註冊後,Eureka Client會維護一個心跳來持續通知Eureka Server,說明服務一直處於可用狀態,防止被剔除。Eureka Client在預設的情況下會每隔30秒傳送一次心跳來進行服務續約。

服務同步:Eureka Server之間會互相進行註冊,構建Eureka Server叢集,不同Eureka Server之間會進行服務同步,用來保證服務資訊的一致性。

獲取服務:服務消費者(Eureka Client)在啟動的時候,會發送一個REST請求給Eureka Server,獲取上面註冊的服務清單,並且快取在Eureka Client本地,預設快取30秒。同時,為了效能考慮,Eureka Server也會維護一份只讀的服務清單快取,該快取每隔30秒更新一次。

服務呼叫:服務消費者在獲取到服務清單後,就可以根據清單中的服務列表資訊,查詢到其他服務的地址,從而進行遠端呼叫。Eureka有Region和Zone的概念,一個Region可以包含多個Zone,在進行服務呼叫時,優先訪問處於同一個Zone中的服務提供者。

服務下線:當Eureka Client需要關閉或重啟時,就不希望在這個時間段內再有請求進來,所以,就需要提前先發送REST請求給Eureka Server,告訴Eureka Server自己要下線了,Eureka Server在收到請求後,就會把該服務狀態置為下線(DOWN),並把該下線事件傳播出去。

服務剔除:有時候,服務例項可能會因為網路故障等原因導致不能提供服務,而此時該例項也沒有傳送請求給Eureka Server來進行服務下線,所以,還需要有服務剔除的機制。Eureka Server在啟動的時候會建立一個定時任務,每隔一段時間(預設60秒),從當前服務清單中把超時沒有續約(預設90秒)的服務剔除。

自我保護:既然Eureka Server會定時剔除超時沒有續約的服務,那就有可能出現一種場景,網路一段時間內發生了異常,所有的服務都沒能夠進行續約,Eureka Server就把所有的服務都剔除了,這樣顯然不太合理。所以,就有了自我保護機制,當短時間內,統計續約失敗的比例,如果達到一定閾值,則會觸發自我保護的機制,在該機制下,Eureka Server不會剔除任何的微服務,等到正常後,再退出自我保護機制。

1. 基本原理:

  1. Eureka Server 提供服務註冊服務,各個節點啟動後,會在Eureka Server中進行註冊,這樣Eureka Server中的服務登錄檔中將會儲存所有可用服務節點的資訊,服務節點的資訊可以在介面中直觀的看到;
  2. Eureka Client 是一個Java 客戶端,用於簡化與Eureka Server的互動,客戶端同時也具備一個內建的、使用輪詢負載演算法的負載均衡器;
  3. 在應用啟動後,將會向Eureka Server傳送心跳(預設週期為30秒),如果Eureka Server在多個心跳週期沒有收到某個節點的心跳,Eureka Server 將會從服務登錄檔中把這個服務節點移除(預設90秒);
  4. Eureka Server之間將會通過複製的方式完成資料的同步;
  5. Eureka Client具有快取的機制,即使所有的Eureka Server 都掛掉的話,客戶端依然可以利用快取中的資訊消費其它服務的API;

上一篇中我們搭建了一個簡單的Eureka客戶端和服務端。如果你有啟動過觀看啟動日誌不難發現:

這裡有個EurekaServerBootstrap類,啟動日誌中給出:Setting the eureka configuration..,Initialized server context。看起來這個應該是個啟動類,跟進去看一下,有個很顯眼的方法:

這個方法的呼叫先按住不表,我們先從啟動類上新增的 EnableEurekaServer註解著手,看看為什麼添加了一個註解就能啟用 Rureka。

從server啟動類上的EnableEurekaServer註解進入:

  1. 接下來引用了EurekaServerMarkerConfiguration,看到在這個註解上有個註釋:啟用這個註解的目的是為了啟用:EurekaServerAutoConfiguration類;

  2. 進入EurekaServerAutoConfiguration看到在類頭部有一個註解:

    @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)

    EurekaServerAutoConfiguration啟動的條件是EurekaServerMarkerConfiguration註解先載入。

上面這一張圖示識出了從啟動註解到預啟動類的流程,但是你會發現實際上 EurekaServerAutoConfiguration 也沒有做什麼事情:配置初始化,啟動一些基本的過濾器。同樣在類頭部的引用上有一個Import註解:

@Import(EurekaServerInitializerConfiguration.class)

所以在 EurekaServerAutoConfiguration 初始化的時候,會引用到 EurekaServerInitializerConfiguration,啟用它的初始化。EurekaServerInitializerConfiguration 實現了SmartLifecycle.start方法,在spring 初始化的時候會被啟動,啟用 run 方法。可以看到在 run 方法中呼叫的就是:

eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);

即我們上面截圖中的EurekaServerBootstrap.contextInitialized()方法。

整體的呼叫流程如下:

具體的初始化資訊見下圖:

2. 服務註冊實現

2.1 server端啟動時同步別的server上的client

在上面講到Eureka server啟動過程中,啟動一個Eureka Client的時候,initEurekaServerContext()裡面會進行服務同步和服務剔除,syncUp()方法所屬的類是:PeerAwareInstanceRegistry,即server端的服務註冊邏輯都在這裡面。因為沒有使用AWS的伺服器,所以預設例項化的實現類為:PeerAwareInstanceRegistryImpl。

PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
    registry = new AwsInstanceRegistry(
        eurekaServerConfig,
        eurekaClient.getEurekaClientConfig(),
        serverCodecs,
        eurekaClient
    );
    awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
    awsBinder.start();
} else {
    registry = new PeerAwareInstanceRegistryImpl(
        eurekaServerConfig,
        eurekaClient.getEurekaClientConfig(),
        serverCodecs,
        eurekaClient
    );
}

PeerAwareInstanceRegistryImpl 繼承了一個抽象類 AbstractInstanceRegistry:

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
    
}

AbstractInstanceRegistry中的實現邏輯是真正的服務註冊儲存所在地:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
    private static final Logger logger = LoggerFactory.getLogger(AbstractInstanceRegistry.class);

    private static final String[] EMPTY_STR_ARRAY = new String[0];
    private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
            = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
    protected Map<String, RemoteRegionRegistry> regionNameVSRemoteRegistry = new HashMap<String, RemoteRegionRegistry>();
    protected final ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap = CacheBuilder
            .newBuilder().initialCapacity(500)
            .expireAfterAccess(1, TimeUnit.HOURS)
            .<String, InstanceStatus>build().asMap();

    
 ....
 ....
 ....
}

所有的服務例項資訊都儲存在 server 本地的map當中。所以在server端啟動的時候會去拉別的server上儲存的client例項,然後儲存到本地快取。

2.2 client主動註冊

如果是某個client主動發出了註冊請求,那麼是如何註冊到服務端呢?

還是檢視日誌:啟動服務端,然後再啟動客戶端,檢視服務端日誌:

這裡能看到剛才啟動的客戶端已經在服務端註冊了,註冊邏輯走的類是:AbstractInstanceRegistry。

上面也提到 是服務註冊的邏輯實現類,完成儲存客戶端資訊的方法是:

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        ......
    }

程式碼就不貼了,主要實現的邏輯是儲存當前註冊的客戶端資訊。我們知道客戶端是傳送了一次http請求給服務端,那麼真正的註冊邏輯應該是從一個http請求的接收處進來的。跟著使用了register方法的地方去找,PeerAwareInstanceRegistryImpl裡面有呼叫:

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    //將新節點資訊告訴別的服務端
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

這裡沒有改寫父類的register邏輯,下面還多了一句:replicateToPeers,這裡主要做的邏輯是:給兄弟 server節點發送register 請求,告訴他們有客戶端來註冊。

繼續看誰呼叫了這裡,可以找到:ApplicationResource 的addInstance方法呼叫了:

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
......
  registry.register(info, "true".equals(isReplication));
  return Response.status(204).build();  // 204 to be backwards compatible
}

而這裡很顯然是一個介面,邏輯就很清晰了:

另外,我們檢視addInstance方法被誰呼叫的過程中發現:PeerReplicationResource--->batchReplication 方法也呼叫了註冊的邏輯。

這個方法一看竟然解答了之前我的疑惑:服務端之間是如何傳送心跳的。原來實現是在這裡。通過dispatch方法來區分當前的呼叫是何種請求,

可以看到,服務註冊,心跳檢測,服務取消,服務下線,服務剔除的入口都在這裡:

@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
    try {
        ReplicationListResponse batchResponse = new ReplicationListResponse();
        for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
            try {
                batchResponse.addResponse(dispatch(instanceInfo));
            } catch (Exception e) {
                batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
                logger.error("{} request processing failed for batch item {}/{}",
                             instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e);
            }
        }
        return Response.ok(batchResponse).build();
    } catch (Throwable e) {
        logger.error("Cannot execute batch Request", e);
        return Response.status(Status.INTERNAL_SERVER_ERROR).build();
    }
}


private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
        ApplicationResource applicationResource = createApplicationResource(instanceInfo);
        InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);

        String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
        String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
        String instanceStatus = toString(instanceInfo.getStatus());

        Builder singleResponseBuilder = new Builder();
        switch (instanceInfo.getAction()) {
            case Register:
                singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
                break;
            case Heartbeat:
                singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
                break;
            case Cancel:
                singleResponseBuilder = handleCancel(resource);
                break;
            case StatusUpdate:
                singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
                break;
            case DeleteStatusOverride:
                singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
                break;
        }
        return singleResponseBuilder.build();
    }

從這個入口進去,大家可以跟蹤一下感興趣的邏輯