1. 程式人生 > >【一起學原始碼-微服務】Nexflix Eureka 原始碼六:在眼花繚亂的程式碼中,EurekaClient是如何註冊的?

【一起學原始碼-微服務】Nexflix Eureka 原始碼六:在眼花繚亂的程式碼中,EurekaClient是如何註冊的?

前言

上一講已經講解了EurekaClient的啟動流程,到了這裡已經有6篇Eureka原始碼分析的文章了,看了下之前的文章,感覺程式碼成分太多,會影響閱讀,後面會只擷取主要的程式碼,加上註釋講解。

這一講看的是EurekaClient註冊的流程,當然也是一塊核心,標題為什麼會寫上眼花繚亂呢?關於EurekaClient註冊的程式碼,真的不是這麼容易被發現的。

如若轉載 請標明來源:一枝花算不算浪漫

原始碼分析

如果是看過前面文章的同學,肯定會知道,Eureka Client啟動流程最後是初始化DiscoveryClient這個類,那麼我們就直接從這個類開始分析,後面程式碼都只擷取重要程式碼,具體大家可以自行參照原始碼。

DiscoveryClient.java

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {
    // 省略部分程式碼...

    this.applicationInfoManager = applicationInfoManager;
    // 建立一個配置例項,這裡面會有eureka的各種資訊,看InstanceInfo類的註釋為:The class that holds information required for registration with Eureka Server 
    // and to be discovered by  other components.
    InstanceInfo myInfo = applicationInfoManager.getInfo();

    // 省略部分程式碼...
    

    try {
        // 支援底層的eureka client跟eureka server進行網路通訊的元件
        eurekaTransport = new EurekaTransport();
        // 傳送http請求,呼叫restful介面
        scheduleServerEndpointTask(eurekaTransport, args);
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }

    
    // 初始化排程任務
    initScheduledTasks();
}

上面省略了很多程式碼,這段程式碼在之前的幾篇文章也都有提及,說實話看到這裡 仍然一臉悶逼,入冊的入口在哪呢?不急,下面慢慢分析。

DiscoveryClient.java

private void initScheduledTasks() {
    // 省略大部分程式碼,這段程式碼是初始化eureka client的一些排程任務

    // InstanceInfo replicator
    // 建立服務拷貝副本
    instanceInfoReplicator = new InstanceInfoReplicator(
            this,
            instanceInfo,
            clientConfig.getInstanceInfoReplicationIntervalSeconds(),
            2); // burstSize

    // 執行執行緒 InitialInstanceInfoReplicationIntervalSeconds預設為40s
    instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
}

上面仍然是DiscoveryClient中的原始碼,看方法名我們知道這裡肯定是初始化EurekaClient啟動時的相關定時任務的。
這裡主要是截取了instanceInfoReplicator初始化和執行instanceInfoReplicator.start的任務,

接著我們就可以順著這個線先看看InstatnceInfoReplicator是何方神聖?

/**
 * A task for updating and replicating the local instanceinfo to the remote server. Properties of this task are:
 * - configured with a single update thread to guarantee sequential update to the remote server
 * - update tasks can be scheduled on-demand via onDemandUpdate()
 * - task processing is rate limited by burstSize
 * - a new update task is always scheduled automatically after an earlier update task. However if an on-demand task
 *   is started, the scheduled automatic update task is discarded (and a new one will be scheduled after the new
 *   on-demand update).
 *
 * 用於將本地instanceinfo更新和複製到遠端伺服器的任務。此任務的屬性是:
 * -配置有單個更新執行緒以保證對遠端伺服器的順序更新
 * -可以通過onDemandUpdate()按需排程更新任務
 * -任務處理的速率受burstSize的限制
 * -新更新總是在較早的更新任務之後自動計劃任務。但是,如果啟動了按需任務*,則計劃的自動更新任務將被丟棄(並且將在新的按需更新之後安排新的任務)
 */
class InstanceInfoReplicator implements Runnable {

}

這裡有兩個關鍵點:

  1. 此類實現了Runnable介面,說白了就是執行一個非同步執行緒
  2. 該類作用是:用於將本地instanceinfo更新和複製到遠端伺服器的任務

看完這兩點,我又不禁陷入思考,我找的是eurekaClient註冊過程,咋還跑到這個裡面來了?不甘心,於是繼續往下看。

InstanceInfoReplicator.start()

public void start(int initialDelayMs) {
    if (started.compareAndSet(false, true)) {
        instanceInfo.setIsDirty();  // for initial register
        Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

這個scheduler是一個排程任務執行緒池,會將this執行緒放入到執行緒池中,然後再指定時間後執行該執行緒的run方法。

InstanceInfoReplicator.run()

public void run() {
    try {
        // 重新整理一下服務例項資訊
        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

看到這裡是不是有種豁然開朗的感覺?看到了register就感覺到希望來了,這裡使用的是DiscoveryClient.register方法,其實這裡我們也可以先找DiscoveryClient中的register方法,然後再反查呼叫方,這也是一種好的思路呀。

DiscoveryClient.register

boolean register() throws Throwable {
    logger.info(PREFIX + appPathIdentifier + ": registering service...");
    EurekaHttpResponse<Void> httpResponse;
    try {
        // 回看eurekaTransport建立及初始化過程
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}

這裡是使用eurekaTransport.registrationClient去進行註冊,我們在最開始DiscoveryClient構造方法中已經截取了eurekaTransport建立及初始化程式碼,這裡再貼一下:

// 支援底層的eureka client跟eureka server進行網路通訊的元件
eurekaTransport = new EurekaTransport();
// 傳送http請求,呼叫restful介面
scheduleServerEndpointTask(eurekaTransport, args);


private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
                                            AbstractDiscoveryClientOptionalArgs args) {

    // 省略大量程式碼

    // 如果需要抓取登錄檔,讀取其他server的註冊資訊
    if (clientConfig.shouldRegisterWithEureka()) {
        EurekaHttpClientFactory newRegistrationClientFactory = null;
        EurekaHttpClient newRegistrationClient = null;
        try {
            newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
                    eurekaTransport.bootstrapResolver,
                    eurekaTransport.transportClientFactory,
                    transportConfig
            );
            newRegistrationClient = newRegistrationClientFactory.newClient();
        } catch (Exception e) {
            logger.warn("Transport initialization failure", e);
        }
        // 將newRegistrationClient放入到eurekaTransport中
        eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
        eurekaTransport.registrationClient = newRegistrationClient;
    }
}

到了這裡,可以看到eurekaTransport.registrationClient實際就是EurekaHttpClient,不知道是我沒找對地方還是什麼原因,我並沒有找到具體執行的實現類。

最後網上查了下,具體執行的實現類是:AbstractJersey2EurekaHttpClient

AbstractJersey2EurekaHttpClient.register()

public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        Response response = null;
        try {
            // 傳送請求,類似於:http://localhost:8080/v2/apps/ServiceA
            // 傳送的是post請求,服務例項的物件被打成了一個json傳送,包括自己的主機、ip、埠號
            // eureka server 就知道了這個ServiceA這個服務,有一個服務例項,比如是在192.168.31.109、host-01、8761埠
            Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
            addExtraProperties(resourceBuilder);
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .accept(MediaType.APPLICATION_JSON)
                    .acceptEncoding("gzip")
                    .post(Entity.json(info));
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }

到了這裡就已經真相大白了,可是 讀了一通發現這個程式碼實在是不好理解,最後再總結一波才行。。。

總結

(1)DiscoveryClient建構函式會初始化EurekaClient相關的定時任務,定時任務裡面會啟動instanceInfo 互相複製的任務,就是InstanceInfoReplicator中的start()

(2)InstanceInfoReplicator的start()方法裡面,將自己作為一個執行緒放到一個排程執行緒池中去了,預設是延遲40s去執行這個執行緒,還將isDirty設定為了ture

(3)如果執行執行緒的時候,是執行run()方法,執行緒

(3)先是找EurekaClient.refreshInstanceInfo()這個方法,裡面其實是呼叫ApplicationInfoManager的一些方法重新整理了一下服務例項的配置,看看配置有沒有改變,如果改變了,就重新整理一下;用健康檢查器,檢查了一下狀態,將狀態設定到了ApplicationInfoManager中去,更新服務例項的狀態

(4)因為之前設定過isDirty,所以這裡會執行進行服務註冊

(5)服務註冊的時候,是基於EurekaClient的reigster()方法去註冊的,呼叫的是底層的TransportClient的RegistrationClient,執行了register()方法,將InstanceInfo服務例項的資訊,通過http請求,呼叫eureka server對外暴露的一個restful介面,將InstanceInfo給傳送了過去。這裡找的是EurekaTransport,在構造的時候,呼叫了scheduleServerEndpointTask()方法,這個方法裡就初始化了專門用於註冊的RegistrationClient。

(6)找SessionedEurekaHttpClient呼叫register()方法,去進行註冊,底層最終使用的AbstractJersey2EurekaHttpClient的register方法實現的

(7)eureka大量的基於jersey框架,在eureka server上提供restful介面,在eureka client如果要傳送請求到eureka server的話,一定是基於jersey框架,去傳送的http restful介面呼叫的請求

(8)真正執行註冊請求的,就是eureka-client-jersey2工程裡的AbstractJersey2EurekaHttpClient,請求http://localhost:8080/v2/apps/ServiceA,將服務例項的資訊傳送過去

eureka client這一塊,在服務註冊的這塊程式碼,很多槽點:

(1)服務註冊,不應該放在InstanceInfoReplicator裡面,語義不明朗

(2)負責傳送請求的HttpClient,類體系過於複雜,導致人根本就找不到對應的Client,最後是根據他是使用jersey框架來進行restful介面暴露和呼叫,才能連蒙帶猜,找到真正傳送服務註冊請求的地方

申明

本文章首發自本人部落格:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!

感興趣的小夥伴可關注個人公眾號:壹枝花算不算浪漫

相關推薦

一起原始碼-服務Nexflix Eureka 原始碼EurekaServer啟動之配置檔案載入以及面向介面的配置項讀取

前言 上篇文章已經介紹了 為何要讀netflix eureka原始碼了,這裡就不再概述,下面開始正式原始碼解讀的內容。 如若轉載 請標明來源:一枝花算不算浪漫 程式碼總覽 還記得上文中,我們通過web.xml找到了eureka server入口的類EurekaBootStrap,這裡我們就先來簡單地看下: /

一起原始碼-服務Nexflix Eureka 原始碼EurekaServer啟動之EurekaServer上下文EurekaClient建立

前言 上篇文章已經介紹了 Eureka Server 環境和上下文初始化的一些程式碼,其中重點講解了environment初始化使用的單例模式,以及EurekaServerConfigure基於介面對外暴露配置方法的設計方式。這一講就是講解Eureka Server上下文初始化剩下的內容:Eureka Cli

一起原始碼-服務Nexflix Eureka 原始碼眼花繚亂程式碼EurekaClient是如何註冊的?

前言 上一講已經講解了EurekaClient的啟動流程,到了這裡已經有6篇Eureka原始碼分析的文章了,看了下之前的文章,感覺程式碼成分太多,會影響閱讀,後面會只擷取主要的程式碼,加上註釋講解。 這一講看的是EurekaClient註冊的流程,當然也是一塊核心,標題為什麼會寫上眼花繚亂呢?關於Eureka

一起原始碼-服務Nexflix Eureka 原始碼通過單元測試來Debug Eureka註冊過程

前言 上一講eureka client是如何註冊的,一直跟到原始碼傳送http請求為止,當時看eureka client註冊時如此費盡,光是找一個regiter的地方就找了半天,那麼client端傳送了http請求給server端,server端是如何處理的呢? 帶著這麼一個疑問 就開始今天原始碼的解讀了。

一起原始碼-服務Nexflix Eureka 原始碼EurekaClient登錄檔抓取 精妙設計分析!

前言 前情回顧 上一講 我們通過單元測試 來梳理了EurekaClient是如何註冊到server端,以及server端接收到請求是如何處理的,這裡最重要的關注點是登錄檔的一個數據結構:ConcurrentHashMap<String, Map<String, Lease<InstanceI

一起原始碼-服務Nexflix Eureka 原始碼服務續約原始碼分析

前言 前情回顧 上一講 我們講解了服務發現的相關邏輯,所謂服務發現 其實就是登錄檔抓取,服務例項預設每隔30s去註冊中心抓取一下注冊表增量資料,然後合併本地登錄檔資料,最後有個hash對比的操作。 本講目錄 今天主要是看下服務續約的邏輯,服務續約就是client端給server端傳送心跳檢測,告訴對方我還活著

一起原始碼-服務Nexflix Eureka 原始碼服務下線及例項摘除一個client下線到底多久才會被其他例項感知?

前言 前情回顧 上一講我們講了 client端向server端傳送心跳檢查,也是預設每30鍾傳送一次,server端接收後會更新登錄檔的一個時間戳屬性,然後一次心跳(續約)也就完成了。 本講目錄 這一篇有兩個知識點及一個疑問,這個疑問是在工作中真真實實遇到過的。 例如我有服務A、服務B,A、B都註冊在同一個註

一起原始碼-服務Nexflix Eureka 原始碼十一EurekaServer自我保護機制竟然有這麼多Bug?

前言 前情回顧 上一講主要講了服務下線,已經註冊中心自動感知宕機的服務。 其實上一講已經包含了很多EurekaServer自我保護的程式碼,其中還發現了1.7.x(1.9.x)包含的一些bug,但這些問題在master分支都已修復了。 服務下線會將服務例項從登錄檔中刪除,然後放入到recentQueue中,下

一起原始碼-服務Nexflix Eureka 原始碼十二EurekaServer叢集模式原始碼分析

前言 前情回顧 上一講看了Eureka 註冊中心的自我保護機制,以及裡面提到的bug問題。 哈哈 轉眼間都2020年了,這個系列的文章從12.17 一直寫到現在,也是不容易哈,每天持續不斷學習,輸出部落格,這一段時間確實收穫很多。 今天在公司給組內成員分享了Eureka原始碼剖析,反響效果還可以,也算是感覺收

一起原始碼-服務Nexflix Eureka 原始碼十三Eureka原始碼解讀完結撒花篇~!

前言 想說的話 【一起學原始碼-微服務-Netflix Eureka】專欄到這裡就已經全部結束了。 實話實說,從最開始Eureka Server和Eureka Client初始化的流程還是一臉悶逼,到現在Eureka各種操作都瞭然於心了。 本專欄從12.17開始寫,一直到今天12.30(文章在平臺是延後釋出的

一起原始碼-服務Ribbon 原始碼Ribbon概念理解及Demo除錯

前言 前情回顧 前面文章已經梳理清楚了Eureka相關的概念及原始碼,接下來開始研究下Ribbon的實現原理。 我們都知道Ribbon在spring cloud中擔當負載均衡的角色, 當兩個Eureka Client互相呼叫的時候,Ribbon能夠做到呼叫時的負載,保證多節點的客戶端均勻接收請求。(這個有點類

一起原始碼-服務Ribbon 原始碼通過Debug找出Ribbon初始化流程及ILoadBalancer原理分析

前言 前情回顧 上一講講了Ribbon的基礎知識,通過一個簡單的demo看了下Ribbon的負載均衡,我們在RestTemplate上加了@LoadBalanced註解後,就能夠自動的負載均衡了。 本講目錄 這一講主要是繼續深入RibbonLoadBalancerClient和Ribbon+Eureka整合的

一起原始碼-服務Ribbon 原始碼Ribbon與Eureka整合原理分析

前言 前情回顧 上一篇講了Ribbon的初始化過程,從LoadBalancerAutoConfiguration 到RibbonAutoConfiguration 再到RibbonClientConfiguration,我們找到了ILoadBalancer預設初始化的物件等。 本講目錄 這一講我們會進一步往下

一起原始碼-服務Ribbon 原始碼進一步探究Ribbon的IRule和IPing

前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進

一起原始碼-服務Ribbon原始碼Ribbon原始碼解讀彙總篇~

前言 想說的話 【一起學原始碼-微服務-Ribbon】專欄到這裡就已經全部結束了,共更新四篇文章。 Ribbon比較小巧,這裡是直接 讀的spring cloud 內嵌封裝的版本,裡面的各種configuration確實有點繞,不過看看第三講Ribbon初始化的過程總結圖就會清晰很多。 緊接著會繼續整理學習F

一起原始碼-服務Feign 原始碼原始碼初探通過Demo Debug Feign原始碼

前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進

一起原始碼-服務Feign 原始碼Feign動態代理構造過程

前言 前情回顧 上一講主要看了@EnableFeignClients中的registerBeanDefinitions()方法,這裡面主要是 將EnableFeignClients註解對應的配置屬性注入,將FeignClient註解對應的屬性注入。 最後是生成FeignClient對應的bean,注入到Spr

一起原始碼-服務Feign 原始碼Feign結合Ribbon實現負載均衡的原理分析

前言 前情回顧 上一講我們已經知道了Feign的工作原理其實是在專案啟動的時候,通過JDK動態代理為每個FeignClinent生成一個動態代理。 動態代理的資料結構是:ReflectiveFeign.FeignInvocationHandler。其中包含target(裡面是serviceName等資訊)和d

一起原始碼-服務Hystrix 原始碼Hystrix基礎原理與Demo搭建

說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一個系列文章講解了Feign的原始碼,主要是Feign動態代理實現的原理,及配合Ribbon實現負載均衡的機制。 這裡我們講解一個新的元件Hystrix,也是和Fe

一起原始碼-服務Hystrix 原始碼Hystrix核心流程Hystix非降級邏輯流程梳理

說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一講我們講了配置了feign.hystrix.enabled=true之後,預設的Targeter就會構建成HystrixTargter, 然後通過對應的Hystr