【一起學原始碼-微服務】Nexflix Eureka 原始碼十二:EurekaServer叢集模式原始碼分析
前言
前情回顧
上一講看了Eureka 註冊中心的自我保護機制,以及裡面提到的bug問題。
哈哈 轉眼間都2020年了,這個系列的文章從12.17 一直寫到現在,也是不容易哈,每天持續不斷學習,輸出部落格,這一段時間確實收穫很多。
今天在公司給組內成員分享了Eureka原始碼剖析,反響效果還可以,也算是感覺收穫了點東西。後面還會繼續feign、ribbon、hystrix的原始碼學習,依然文章連載的形式輸出。
本講目錄
本講主要是EurekaServer叢集模式的資料同步講解,主要目錄如下。
目錄如下:
- eureka server叢集機制
- 註冊、下線、續約的登錄檔同步機制
- 登錄檔同步三層佇列機制詳解
技術亮點:
- 3層佇列機制實現登錄檔的批量同步需求
說明
原創不易,如若轉載 請標明來源!
部落格地址:一枝花算不算浪漫
微信公眾號:壹枝花算不算浪漫
原始碼分析
eureka server叢集機制
Eureka Server會在註冊、下線、續約的時候進行資料同步,將資訊同步到其他Eureka Server節點。
可以想象到的是,這裡肯定不會是實時同步的,往後繼續看登錄檔的同步機制吧。
註冊、下線、續約的登錄檔同步機制
我們以Eureka Client註冊為例,看看Eureka Server是如何同步給其他節點的。
PeerAwareInstanceRegistryImpl.java
public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); } private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } } private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
- 註冊完成後,呼叫
replicateToPeers()
,注意這裡面有一個引數isReplication
,如果是true,代表是其他Eureka Server節點同步的,false則是EurekaClient註冊來的。 replicateToPeers()
中一段邏輯,如果isReplication
為true則直接跳出,這裡意思是client註冊來的服務例項需要向其他節點擴散,如果不是則不需要去同步peerEurekaNodes.getPeerEurekaNodes()
拿到所有的Eureka Server節點,迴圈遍歷去同步資料,呼叫replicateInstanceActionsToPeers()
replicateInstanceActionsToPeers()
方法中根據註冊、下線、續約等去處理不同邏輯
接下來就是真正執行同步邏輯的地方,這裡主要用了三層佇列對同步請求進行了batch操作,將請求打成一批批 然後向各個EurekaServer進行http請求。
登錄檔同步三層佇列機制詳解
到了這裡就是真正進入了同步的邏輯,這裡還是以上面註冊邏輯為主線,接著上述程式碼繼續往下跟:
PeerEurekaNode.java
:
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
這裡會執行batchingDispatcher.process()
方法,我們繼續點進去,然後會進入 TaskDispatchers.createBatchingTaskDispatcher()
方法,檢視其中的匿名內部類中的process()
方法:
void process(ID id, T task, long expiryTime) {
// 將請求都放入到acceptorQueue中
acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
acceptedTasks++;
}
將需要同步的Task資料放入到acceptorQueue
佇列中。
接著回到createBatchingTaskDispatcher()
方法中,看下AcceptorExecutor
,它的建構函式中會啟動一個後臺執行緒:
ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);
我們繼續跟AcceptorRunner.java
:
class AcceptorRunner implements Runnable {
@Override
public void run() {
long scheduleTime = 0;
while (!isShutdown.get()) {
try {
// 處理acceptorQueue佇列中的資料
drainInputQueues();
int totalItems = processingOrder.size();
long now = System.currentTimeMillis();
if (scheduleTime < now) {
scheduleTime = now + trafficShaper.transmissionDelay();
}
if (scheduleTime <= now) {
// 將processingOrder拆分成一個個batch,然後進行操作
assignBatchWork();
assignSingleItemWork();
}
// If no worker is requesting data or there is a delay injected by the traffic shaper,
// sleep for some time to avoid tight loop.
if (totalItems == processingOrder.size()) {
Thread.sleep(10);
}
} catch (InterruptedException ex) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery AcceptorThread error", e);
}
}
}
private void drainInputQueues() throws InterruptedException {
do {
drainAcceptorQueue();
if (!isShutdown.get()) {
// If all queues are empty, block for a while on the acceptor queue
if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
if (taskHolder != null) {
appendTaskHolder(taskHolder);
}
}
}
} while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
}
private void drainAcceptorQueue() {
while (!acceptorQueue.isEmpty()) {
// 將acceptor佇列中的資料放入到processingOrder佇列中去,方便後續拆分成batch
appendTaskHolder(acceptorQueue.poll());
}
}
private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
if (isFull()) {
pendingTasks.remove(processingOrder.poll());
queueOverflows++;
}
TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
if (previousTask == null) {
processingOrder.add(taskHolder.getId());
} else {
overriddenTasks++;
}
}
}
認真跟這裡面的程式碼,可以看到這裡是將上面的acceptorQueue
放入到processingOrder
, 其中processingOrder
也是一個佇列。
在AcceptorRunner.java
的run()
方法中,還會呼叫assignBatchWork()
方法,這裡面就是將processingOrder
打成一個個batch,接著看程式碼:
void assignBatchWork() {
if (hasEnoughTasksForNextBatch()) {
if (batchWorkRequests.tryAcquire(1)) {
long now = System.currentTimeMillis();
int len = Math.min(maxBatchingSize, processingOrder.size());
List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
while (holders.size() < len && !processingOrder.isEmpty()) {
ID id = processingOrder.poll();
TaskHolder<ID, T> holder = pendingTasks.remove(id);
if (holder.getExpiryTime() > now) {
holders.add(holder);
} else {
expiredTasks++;
}
}
if (holders.isEmpty()) {
batchWorkRequests.release();
} else {
batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
// 將批量資料放入到batchWorkQueue中
batchWorkQueue.add(holders);
}
}
}
}
private boolean hasEnoughTasksForNextBatch() {
if (processingOrder.isEmpty()) {
return false;
}
// 預設maxBufferSize為250
if (pendingTasks.size() >= maxBufferSize) {
return true;
}
TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
// 預設maxBatchingDelay為500ms
long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
return delay >= maxBatchingDelay;
}
這裡加入batch的規則是:maxBufferSize
預設為250
maxBatchingDelay
預設為500ms,打成一個個batch後就開始傳送給server端。至於怎麼傳送 我們接著看 PeerEurekaNode.java
, 我們在最開始呼叫register()
方法就是呼叫PeerEurekaNode.register()
, 我們來看看它的構造方法:
PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
HttpReplicationClient replicationClient, EurekaServerConfig config,
int batchSize, long maxBatchingDelayMs,
long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
this.registry = registry;
this.targetHost = targetHost;
this.replicationClient = replicationClient;
this.serviceUrl = serviceUrl;
this.config = config;
this.maxProcessingDelayMs = config.getMaxTimeForReplication();
String batcherName = getBatcherName();
ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
batcherName,
config.getMaxElementsInPeerReplicationPool(),
batchSize,
config.getMaxThreadsForPeerReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
}
這裡會例項化一個ReplicationTaskProcessor.java
, 我們跟進去,發下它是實現TaskProcessor
的,所以一定會執行此類中的process()
方法,執行方法如下:
public ProcessingResult process(List<ReplicationTask> tasks) {
ReplicationList list = createReplicationListOf(tasks);
try {
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
int statusCode = response.getStatusCode();
if (!isSuccess(statusCode)) {
if (statusCode == 503) {
logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
return ProcessingResult.Congestion;
} else {
// Unexpected error returned from the server. This should ideally never happen.
logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
return ProcessingResult.PermanentError;
}
} else {
handleBatchResponse(tasks, response.getEntity().getResponseList());
}
} catch (Throwable e) {
if (isNetworkConnectException(e)) {
logNetworkErrorSample(null, e);
return ProcessingResult.TransientError;
} else {
logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
return ProcessingResult.PermanentError;
}
}
return ProcessingResult.Success;
}
這裡面是將List<ReplicationTask> tasks
通過submitBatchUpdate()
傳送給server端。
server端在PeerReplicationResource.batchReplication()
去處理,實際上就是迴圈呼叫ApplicationResource.addInstance()
方法,又回到了最開始註冊的方法。
到此 EurekaServer同步的邏輯就結束了,這裡主要是三層佇列的資料結構很繞,通過一個batchList去批量同步資料的。
注意這裡還有一個很重要的點,就是Client註冊時呼叫addInstance()方法,這裡到了server端PeerAwareInstanceRegistryImpl
會執行同步其他EurekaServer邏輯。
而EurekaServer同步註冊介面仍然會呼叫addInstance()方法,這裡難不成就死迴圈呼叫了?當然不是,addInstance()中也有個引數:isReplication
, 在最後呼叫server端方法的時候如下:registry.register(info, "true".equals(isReplication));
我們知道,EurekaClient在註冊的時候isReplication
傳遞為空,所以這裡為false,而Server端同步的時候呼叫:
PeerReplicationResource
:
private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
return new Builder().setStatusCode(Status.OK.getStatusCode());
}
這裡的REPLICATION
為true
另外在AbstractJersey2EurekaHttpClient
中傳送register請求的時候,有個addExtraHeaders()
方法,如下圖:
如果是使用的Jersey2ReplicationClient
傳送的,那麼header中的x-netflix-discovery-replication
配置則為true,在後面執行註冊的addInstance()
方法中會接收這個引數的:
總結
仍然一圖流,文中解析的內容都包含在這張圖中了:
申明
本文章首發自本人部落格:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小夥伴可關注個人公眾號:壹枝花算不算浪漫
相關推薦
【一起學原始碼-微服務】Nexflix Eureka 原始碼十二:EurekaServer叢集模式原始碼分析
前言 前情回顧 上一講看了Eureka 註冊中心的自我保護機制,以及裡面提到的bug問題。 哈哈 轉眼間都2020年了,這個系列的文章從12.17 一直寫到現在,也是不容易哈,每天持續不斷學習,輸出部落格,這一段時間確實收穫很多。 今天在公司給組內成員分享了Eureka原始碼剖析,反響效果還可以,也算是感覺收
【一起學原始碼-微服務】Nexflix Eureka 原始碼二:EurekaServer啟動之配置檔案載入以及面向介面的配置項讀取
前言 上篇文章已經介紹了 為何要讀netflix eureka原始碼了,這裡就不再概述,下面開始正式原始碼解讀的內容。 如若轉載 請標明來源:一枝花算不算浪漫 程式碼總覽 還記得上文中,我們通過web.xml找到了eureka server入口的類EurekaBootStrap,這裡我們就先來簡單地看下: /
【一起學原始碼-微服務】Nexflix Eureka 原始碼三:EurekaServer啟動之EurekaServer上下文EurekaClient建立
前言 上篇文章已經介紹了 Eureka Server 環境和上下文初始化的一些程式碼,其中重點講解了environment初始化使用的單例模式,以及EurekaServerConfigure基於介面對外暴露配置方法的設計方式。這一講就是講解Eureka Server上下文初始化剩下的內容:Eureka Cli
【一起學原始碼-微服務】Nexflix Eureka 原始碼六:在眼花繚亂的程式碼中,EurekaClient是如何註冊的?
前言 上一講已經講解了EurekaClient的啟動流程,到了這裡已經有6篇Eureka原始碼分析的文章了,看了下之前的文章,感覺程式碼成分太多,會影響閱讀,後面會只擷取主要的程式碼,加上註釋講解。 這一講看的是EurekaClient註冊的流程,當然也是一塊核心,標題為什麼會寫上眼花繚亂呢?關於Eureka
【一起學原始碼-微服務】Nexflix Eureka 原始碼七:通過單元測試來Debug Eureka註冊過程
前言 上一講eureka client是如何註冊的,一直跟到原始碼傳送http請求為止,當時看eureka client註冊時如此費盡,光是找一個regiter的地方就找了半天,那麼client端傳送了http請求給server端,server端是如何處理的呢? 帶著這麼一個疑問 就開始今天原始碼的解讀了。
【一起學原始碼-微服務】Nexflix Eureka 原始碼八:EurekaClient登錄檔抓取 精妙設計分析!
前言 前情回顧 上一講 我們通過單元測試 來梳理了EurekaClient是如何註冊到server端,以及server端接收到請求是如何處理的,這裡最重要的關注點是登錄檔的一個數據結構:ConcurrentHashMap<String, Map<String, Lease<InstanceI
【一起學原始碼-微服務】Nexflix Eureka 原始碼九:服務續約原始碼分析
前言 前情回顧 上一講 我們講解了服務發現的相關邏輯,所謂服務發現 其實就是登錄檔抓取,服務例項預設每隔30s去註冊中心抓取一下注冊表增量資料,然後合併本地登錄檔資料,最後有個hash對比的操作。 本講目錄 今天主要是看下服務續約的邏輯,服務續約就是client端給server端傳送心跳檢測,告訴對方我還活著
【一起學原始碼-微服務】Nexflix Eureka 原始碼十:服務下線及例項摘除,一個client下線到底多久才會被其他例項感知?
前言 前情回顧 上一講我們講了 client端向server端傳送心跳檢查,也是預設每30鍾傳送一次,server端接收後會更新登錄檔的一個時間戳屬性,然後一次心跳(續約)也就完成了。 本講目錄 這一篇有兩個知識點及一個疑問,這個疑問是在工作中真真實實遇到過的。 例如我有服務A、服務B,A、B都註冊在同一個註
【一起學原始碼-微服務】Nexflix Eureka 原始碼十一:EurekaServer自我保護機制竟然有這麼多Bug?
前言 前情回顧 上一講主要講了服務下線,已經註冊中心自動感知宕機的服務。 其實上一講已經包含了很多EurekaServer自我保護的程式碼,其中還發現了1.7.x(1.9.x)包含的一些bug,但這些問題在master分支都已修復了。 服務下線會將服務例項從登錄檔中刪除,然後放入到recentQueue中,下
【一起學原始碼-微服務】Nexflix Eureka 原始碼十三:Eureka原始碼解讀完結撒花篇~!
前言 想說的話 【一起學原始碼-微服務-Netflix Eureka】專欄到這裡就已經全部結束了。 實話實說,從最開始Eureka Server和Eureka Client初始化的流程還是一臉悶逼,到現在Eureka各種操作都瞭然於心了。 本專欄從12.17開始寫,一直到今天12.30(文章在平臺是延後釋出的
【一起學原始碼-微服務】Ribbon 原始碼一:Ribbon概念理解及Demo除錯
前言 前情回顧 前面文章已經梳理清楚了Eureka相關的概念及原始碼,接下來開始研究下Ribbon的實現原理。 我們都知道Ribbon在spring cloud中擔當負載均衡的角色, 當兩個Eureka Client互相呼叫的時候,Ribbon能夠做到呼叫時的負載,保證多節點的客戶端均勻接收請求。(這個有點類
【一起學原始碼-微服務】Ribbon 原始碼二:通過Debug找出Ribbon初始化流程及ILoadBalancer原理分析
前言 前情回顧 上一講講了Ribbon的基礎知識,通過一個簡單的demo看了下Ribbon的負載均衡,我們在RestTemplate上加了@LoadBalanced註解後,就能夠自動的負載均衡了。 本講目錄 這一講主要是繼續深入RibbonLoadBalancerClient和Ribbon+Eureka整合的
【一起學原始碼-微服務】Ribbon 原始碼三:Ribbon與Eureka整合原理分析
前言 前情回顧 上一篇講了Ribbon的初始化過程,從LoadBalancerAutoConfiguration 到RibbonAutoConfiguration 再到RibbonClientConfiguration,我們找到了ILoadBalancer預設初始化的物件等。 本講目錄 這一講我們會進一步往下
【一起學原始碼-微服務】Ribbon 原始碼四:進一步探究Ribbon的IRule和IPing
前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進
【一起學原始碼-微服務】Ribbon原始碼五:Ribbon原始碼解讀彙總篇~
前言 想說的話 【一起學原始碼-微服務-Ribbon】專欄到這裡就已經全部結束了,共更新四篇文章。 Ribbon比較小巧,這裡是直接 讀的spring cloud 內嵌封裝的版本,裡面的各種configuration確實有點繞,不過看看第三講Ribbon初始化的過程總結圖就會清晰很多。 緊接著會繼續整理學習F
【一起學原始碼-微服務】Feign 原始碼一:原始碼初探,通過Demo Debug Feign原始碼
前言 前情回顧 上一講深入的講解了Ribbon的初始化過程及Ribbon與Eureka的整合程式碼,與Eureka整合的類就是DiscoveryEnableNIWSServerList,同時在DynamicServerListLoadBalancer中會呼叫PollingServerListUpdater 進
【一起學原始碼-微服務】Feign 原始碼二:Feign動態代理構造過程
前言 前情回顧 上一講主要看了@EnableFeignClients中的registerBeanDefinitions()方法,這裡面主要是 將EnableFeignClients註解對應的配置屬性注入,將FeignClient註解對應的屬性注入。 最後是生成FeignClient對應的bean,注入到Spr
【一起學原始碼-微服務】Feign 原始碼三:Feign結合Ribbon實現負載均衡的原理分析
前言 前情回顧 上一講我們已經知道了Feign的工作原理其實是在專案啟動的時候,通過JDK動態代理為每個FeignClinent生成一個動態代理。 動態代理的資料結構是:ReflectiveFeign.FeignInvocationHandler。其中包含target(裡面是serviceName等資訊)和d
【一起學原始碼-微服務】Hystrix 原始碼一:Hystrix基礎原理與Demo搭建
說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一個系列文章講解了Feign的原始碼,主要是Feign動態代理實現的原理,及配合Ribbon實現負載均衡的機制。 這裡我們講解一個新的元件Hystrix,也是和Fe
【一起學原始碼-微服務】Hystrix 原始碼二:Hystrix核心流程:Hystix非降級邏輯流程梳理
說明 原創不易,如若轉載 請標明來源! 歡迎關注本人微信公眾號:壹枝花算不算浪漫 更多內容也可檢視本人部落格:一枝花算不算浪漫 前言 前情回顧 上一講我們講了配置了feign.hystrix.enabled=true之後,預設的Targeter就會構建成HystrixTargter, 然後通過對應的Hystr