1. 程式人生 > >Eureka-server原始碼分析

Eureka-server原始碼分析

作者:陳剛,叩丁狼高階講師。本文為原創文章,轉載請註明出處。    

我們在分析eureka-client的時候發現他是通過Http Rest 的方式做請求的,那麼eureka-server一定是基於Rest風格 類似SpringMvc一樣的模式接受處理請求。在 EurekaServerAutoConfiguration 自動配置類中有一個 jerseyFilterRegistration方法,這個方法表明eureka-server使用了Jersey實現Rest

/**
     * Register the Jersey filter
     */
    @Bean
    public FilterRegistrationBean jerseyFilterRegistration(
            javax.ws.rs.core.Application eurekaJerseyApp) {
        FilterRegistrationBean bean = new FilterRegistrationBean();
        bean.setFilter(new ServletContainer(eurekaJerseyApp));
        bean.setOrder(Ordered.LOWEST_PRECEDENCE);
        bean.setUrlPatterns(
                Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));

        return bean;
    }

這裡在註冊一個 Jersey 的 filter ,配置好相應的Filter 和 url對映等

處理服務註冊

Eureka Server對於Eureka client的REST請求的定義都位於com.netflix.eureka.resources包下,我們先來看
com.netflix.eureka.resources.ApplicationResource下面的addInstance方法,看名字就知道是新增服務例項

 @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }

        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible

InstanceInfo 引數就是客戶端提交的服務元配置資訊,
做了一大堆判斷後呼叫了com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl.register: 進行註冊,我們跟蹤進去

    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

繼續跟蹤 super.register :呼叫了父類的註冊方法。

 /**
     * Registers a new instance with a given duration.
     *
     * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
     */
    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }

這裡我們看到 ConcurrentHashMap<String, Lease<InstanceInfo>> 這樣的一個 執行緒安全的map去存放客戶端註冊的服務,
它是一個兩層Map結構,第一層的key儲存服務名:InstanceInfo中的appName屬性,第二層的key儲存例項名:InstanceInfo中的instanceId屬性。

服務的續約(心跳)

服務續約在 com.netflix.eureka.resources.InstanceResource#renewLease方法中

 @PUT
    public Response renewLease(
            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
            @QueryParam("overriddenstatus") String overriddenStatus,
            @QueryParam("status") String status,
            @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
        boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

獲取到客戶端服務名字 呼叫registry.renew實現續約 ,我們跟蹤進去最終會呼叫 com.netflix.eureka.registry.AbstractInstanceRegistry#renew方法

 public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    instanceInfo.getOverriddenStatus().name(),
                                    instanceInfo.getId());
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

                }
            }
            renewsLastMin.increment();
            leaseToRenew.renew();
            return true;
        }

Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
leaseToRenew = gMap.get(id); 在獲取註冊的服務例項

instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); 在修改服務例項的狀態
leaseToRenew.renew(); 在增加租約時間

    public void renew() {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
    }

取消租約:客戶端線下

追蹤 InstanceResource#cancelLease() 方法,最終會呼叫 com.netflix.eureka.registry.AbstractInstanceRegistry#internalCancel

 protected boolean internalCancel(String appName, String id, boolean isReplication) {
        boolean var7;
        try {
            this.read.lock();
            EurekaMonitors.CANCEL.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
                leaseToCancel = (Lease)gMap.remove(id);
            }
  ......

gMap.remove(id); 就是獲取當前要下線的服務id,然後從註冊的map中移除掉

服務端的請求接收差不多,對於其他的服務端處理這裡就不在多說,可執行斷點跟蹤。