【一起學原始碼-微服務】Nexflix Eureka 原始碼九:服務續約原始碼分析
前言
前情回顧
上一講 我們講解了服務發現的相關邏輯,所謂服務發現 其實就是登錄檔抓取,服務例項預設每隔30s去註冊中心抓取一下注冊表增量資料,然後合併本地登錄檔資料,最後有個hash對比的操作。
本講目錄
今天主要是看下服務續約的邏輯,服務續約就是client端給server端傳送心跳檢測,告訴對方我還活著。現在很多分散式系統都會有心跳檢查的機制,這裡一起來學習下Eureka是怎麼做心跳檢查的。
目錄如下:
- client端心跳檢查排程任務
- server端接收心跳檢查,設定最後renew時間
這一講內容不太多,因為上一篇文章寫全量和增量登錄檔資訊內容有點多,所以這裡將部落格儘量一篇保持一個知識點,後面還會講服務例項下線、摘除、註冊中心自我保護等機制的實現原理。
說明
原創不易,如若轉載 請標明來源:一枝花算不算浪漫
原始碼分析
client端心跳檢查排程任務
服務例項續約程式碼比較簡單,這裡還是從DiscovertClient.java
開始,很多原始碼的入口都是在這裡,因為client端初始化、註冊 都是走的這裡,因為前幾篇文章對這個類已經分析很多了,這裡只擷取部分重要程式碼:
DiscovertClient.java
初始化後 會繼續初始化一些排程任務:
private void initScheduledTasks() { if (clientConfig.shouldRegisterWithEureka()) { // 預設也是30s int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs); // Heartbeat timer // 執行heartbeatExecutor心跳檢查,預設是30s scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // 執行執行緒 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } } private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } } boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e); return false; } } public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) { String urlPath = "apps/" + appName + '/' + id; Response response = null; try { WebTarget webResource = jerseyClient.target(serviceUrl) .path(urlPath) .queryParam("status", info.getStatus().toString()) .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus != null) { webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name()); } Builder requestBuilder = webResource.request(); addExtraProperties(requestBuilder); addExtraHeaders(requestBuilder); requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE); response = requestBuilder.put(Entity.entity("{}", MediaType.APPLICATION_JSON_TYPE)); // Jersey2 refuses to handle PUT with no body EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response)); if (response.hasEntity()) { eurekaResponseBuilder.entity(response.readEntity(InstanceInfo.class)); } return eurekaResponseBuilder.build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey2 HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
這裡的流程很簡單,初始化DiscoveryClient
後會新建一個排程任務,然後執行HeartbeatThread
中的run方法,預設是renewalIntervalInSecs
30s執行一次。
具體就是給Server端傳送一個http請求,類似於:http://localhost:8080/v2/apps/ServiceA/i-000000-1
, 走的是put請求。
最後拿到響應結果,續約成功後會更新lastSuccessfulHeartbeatTimestamp
最近成功心跳檢測的時間戳。
server端接收心跳檢查請求
前幾篇文章已經說過,Server端接收http請求的入口在eureka-core
模組下的 resource
ApplicationResource.java
中的getInstanceInfo
方法,這裡直接請求的InstanceResource
類的構造方法,找到這個方法中的@PUT
請求。可以直接看下程式碼:
InstanceResource.renewLease
+AbstractInstanceRegistry.renew
方法:
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// 省略部分程式碼
logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
return response;
}
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
Object[] args = {
instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId()
};
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", args);
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
leaseToRenew.renew();
return true;
}
}
這裡主要看renew
方法, 這裡看到registry
是一個登錄檔,通過appName獲取對應的服務登錄檔資訊。
這裡主要還是看leaseToRenew.renew()
其實很簡單,就是設定當前示例登錄檔的renew屬性的lastUpdateTimestamp
為最新時間+duration。
至於這裡的duration 我們下一講會詳細講解,duration 和服務例項摘除有關。
總結
(1)DiscoveryClient初始化的時候,會去排程一堆定時任務,其中有一個就是HeartbeatThread,心跳執行緒
(2)在這裡可以看到,預設是每隔30秒去傳送一次心跳,每隔30秒執行一次HeartbeatTHread執行緒的邏輯,傳送心跳
(3)這邊的話就是去傳送這個心跳,走的是EurekaHttpClient的sendHeartbeat()方法,http://localhost:8080/v2/apps/ServiceA/i-000000-1,走的是put請求
(4)負責承接服務例項的心跳相關的這些操作的,是ApplicationsResource,服務相關的controller。找到ApplicationResource,再次找到InstanceResource,通過PUT請求,可以找到renewLease方法。
(5)通過登錄檔的renew()方法,進去完成服務續約,實際進入AbstractInstanceRegistry的renew()方法
(6)從登錄檔的map中,根據服務名和例項id,獲取一個Lease
申明
本文章首發自本人部落格:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小夥伴可關注個人公眾號:壹枝花算不算浪漫
相關推薦
【一起學原始碼-微服務】Nexflix Eureka 原始碼二:EurekaServer啟動之配置檔案載入以及面向介面的配置項讀取
前言 上篇文章已經介紹了 為何要讀netflix eureka原始碼了,這裡就不再概述,下面開始正式原始碼解讀的內容。 如若轉載 請標明來源:一枝花算不算浪漫 程式碼總覽 還記得上文中,我們通過web.xml找到了eureka server入口的類EurekaBootStrap,這裡我們就先來簡單地看下: /
【一起學原始碼-微服務】Nexflix Eureka 原始碼三:EurekaServer啟動之EurekaServer上下文EurekaClient建立
前言 上篇文章已經介紹了 Eureka Server 環境和上下文初始化的一些程式碼,其中重點講解了environment初始化使用的單例模式,以及EurekaServerConfigure基於介面對外暴露配置方法的設計方式。這一講就是講解Eureka Server上下文初始化剩下的內容:Eureka Cli
【一起學原始碼-微服務】Nexflix Eureka 原始碼六:在眼花繚亂的程式碼中,EurekaClient是如何註冊的?
前言 上一講已經講解了EurekaClient的啟動流程,到了這裡已經有6篇Eureka原始碼分析的文章了,看了下之前的文章,感覺程式碼成分太多,會影響閱讀,後面會只擷取主要的程式碼,加上註釋講解。 這一講看的是EurekaClient註冊的流程,當然也是一塊核心,標題為什麼會寫上眼花繚亂呢?關於Eureka
【一起學原始碼-微服務】Nexflix Eureka 原始碼七:通過單元測試來Debug Eureka註冊過程
前言 上一講eureka client是如何註冊的,一直跟到原始碼傳送http請求為止,當時看eureka client註冊時如此費盡,光是找一個regiter的地方就找了半天,那麼client端傳送了http請求給server端,server端是如何處理的呢? 帶著這麼一個疑問 就開始今天原始碼的解讀了。
【一起學原始碼-微服務】Nexflix Eureka 原始碼八:EurekaClient登錄檔抓取 精妙設計分析!
前言 前情回顧 上一講 我們通過單元測試 來梳理了EurekaClient是如何註冊到server端,以及server端接收到請求是如何處理的,這裡最重要的關注點是登錄檔的一個數據結構:ConcurrentHashMap<String, Map<String, Lease<InstanceI
【一起學原始碼-微服務】Nexflix Eureka 原始碼九:服務續約原始碼分析
前言 前情回顧 上一講 我們講解了服務發現的相關邏輯,所謂服務發現 其實就是登錄檔抓取,服務例項預設每隔30s去註冊中心抓取一下注冊表增量資料,然後合併本地登錄檔資料,最後有個hash對比的操作。 本講目錄 今天主要是看下服務續約的邏輯,服務續約就是client端給server端傳送心跳檢測,告訴對方我還活著
【一起學原始碼-微服務】Nexflix Eureka 原始碼十:服務下線及例項摘除,一個client下線到底多久才會被其他例項感知?
前言 前情回顧 上一講我們講了 client端向server端傳送心跳檢查,也是預設每30鍾傳送一次,server端接收後會更新登錄檔的一個時間戳屬性,然後一次心跳(續約)也就完成了。 本講目錄 這一篇有兩個知識點及一個疑問,這個疑問是在工作中真真實實遇到過的。 例如我有服務A、服務B,A、B都註冊在同一個註
【一起學原始碼-微服務】Nexflix Eureka 原始碼十一:EurekaServer自我保護機制竟然有這麼多Bug?
前言 前情回顧 上一講主要講了服務下線,已經註冊中心自動感知宕機的服務。 其實上一講已經包含了很多EurekaServer自我保護的程式碼,其中還發現了1.7.x(1.9.x)包含的一些bug,但這些問題在master分支都已修復了。 服務下線會將服務例項從登錄檔中刪除,然後放入到recentQueue中,下
【一起學原始碼-微服務】Nexflix Eureka 原始碼十二:EurekaServer叢集模式原始碼分析
前言 前情回顧 上一講看了Eureka 註冊中心的自我保護機制,以及裡面提到的bug問題。 哈哈 轉眼間都2020年了,這個系列的文章從12.17 一直寫到現在,也是不容易哈,每天持續不斷學習,輸出部落格,這一段時間確實收穫很多。 今天在公司給組內成員分享了Eureka原始碼剖析,反響效果還可以,也算是感覺收
【一起學原始碼-微服務】Nexflix Eureka 原始碼十三:Eureka原始碼解讀完結撒花篇~!
前言 想說的話 【一起學原始碼-微服務-Netflix Eureka】專欄到這裡就已經全部結束了。 實話實說,從最開始Eureka Server和Eureka Client初始化的流程還是一臉悶逼,到現在Eureka各種操作都瞭然於心了。 本專欄從12.17開始寫,一直到今天12.30(文章在平臺是延後釋出的
【一起學原始碼-微服務】Ribbon 原始碼一:Ribbon概念理解及Demo除錯
前言 前情回顧 前面文章已經梳理清楚了Eureka相關的概念及原始碼,接下來開始研究下Ribbon的實現原理。 我們都知道Ribbon在spring cloud中擔當負載均衡的角色, 當兩個Eureka Client互相呼叫的時候,Ribbon能夠做到呼叫時的負載,保證多節點的客戶端均勻接收請求。(這個有點類
【一起學原始碼-微服務】Ribbon 原始碼二:通過Debug找出Ribbon初始化流程及ILoadBalancer原理分析
前言 前情回顧 上一講講了Ribbon的基礎知識,通過一個簡單的demo看了下Ribbon的負載均衡,我們在RestTemplate上加了@LoadBalanced註解後,就能夠自動的負載均衡了。 本講目錄 這一講主要是繼續深入RibbonLoadBalancerClient和Ribbon+Eureka整合的
【一起學原始碼-微服務】Ribbon 原始碼三:Ribbon與Eureka整合原理分析
前言 前情回顧 上一篇講了Ribbon的初始化過程,從LoadBalancerAutoConfiguration 到RibbonAutoConfiguration 再到RibbonClientConfiguration,我們找到了ILoadBalancer預設初始化的物件等。 本講目錄 這一講我們會進一步往下
【一起學原始碼-微服務】Ribbon 原始碼四:進一步探究Ribbon的IRule和IPing
前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進
【一起學原始碼-微服務】Ribbon原始碼五:Ribbon原始碼解讀彙總篇~
前言 想說的話 【一起學原始碼-微服務-Ribbon】專欄到這裡就已經全部結束了,共更新四篇文章。 Ribbon比較小巧,這裡是直接 讀的spring cloud 內嵌封裝的版本,裡面的各種configuration確實有點繞,不過看看第三講Ribbon初始化的過程總結圖就會清晰很多。 緊接著會繼續整理學習F
【一起學原始碼-微服務】Feign 原始碼一:原始碼初探,通過Demo Debug Feign原始碼
前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進
【一起學原始碼-微服務】Feign 原始碼二:Feign動態代理構造過程
前言 前情回顧 上一講主要看了@EnableFeignClients中的registerBeanDefinitions()方法,這裡面主要是 將EnableFeignClients註解對應的配置屬性注入,將FeignClient註解對應的屬性注入。 最後是生成FeignClient對應的bean,注入到Spr
【一起學原始碼-微服務】Feign 原始碼三:Feign結合Ribbon實現負載均衡的原理分析
前言 前情回顧 上一講我們已經知道了Feign的工作原理其實是在專案啟動的時候,通過JDK動態代理為每個FeignClinent生成一個動態代理。 動態代理的資料結構是:ReflectiveFeign.FeignInvocationHandler。其中包含target(裡面是serviceName等資訊)和d
【一起學原始碼-微服務】Hystrix 原始碼一:Hystrix基礎原理與Demo搭建
說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一個系列文章講解了Feign的原始碼,主要是Feign動態代理實現的原理,及配合Ribbon實現負載均衡的機制。 這裡我們講解一個新的元件Hystrix,也是和Fe
【一起學原始碼-微服務】Hystrix 原始碼二:Hystrix核心流程:Hystix非降級邏輯流程梳理
說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一講我們講了配置了feign.hystrix.enabled=true之後,預設的Targeter就會構建成HystrixTargter, 然後通過對應的Hystr