Eureka註冊流程(2)
4. AcceptorExecutor#AcceptorRunner執行緒和TaskExecutors#BatchWorkerRunnable執行緒在初始化時作為守護執行緒啟動,這裡處理請求有批量和單個,但是共用一個AcceptorExecutor,只是引數maxBatchingSize不一樣,AcceptorRunner#run,先判斷執行緒有沒有被強制停止,
public void run() { long scheduleTime = 0; while (!isShutdown.get()) { try { drainInputQueues(); int totalItems = processingOrder.size(); long now = System.currentTimeMillis(); if (scheduleTime < now) { scheduleTime = now + trafficShaper.transmissionDelay(); } if (scheduleTime <= now) { assignBatchWork(); assignSingleItemWork(); } // If no worker is requesting data or there is a delay injected by the traffic shaper, // sleep for some time to avoid tight loop. if (totalItems == processingOrder.size()) { Thread.sleep(10); } } catch (InterruptedException ex) { // Ignore } catch (Throwable e) { // Safe-guard, so we never exit this loop in an uncontrolled way. logger.warn("Discovery AcceptorThread error", e); } } }
然後通過drainAcceptorQueue方法把acceptorQueue佇列中的任務取出來放入pendingTasks和processingOrder,代表的意思就是把需要執行的任務都放入待執行任務佇列中。
private boolean isFull() { return pendingTasks.size() >= maxBufferSize; } private void drainInputQueues() throws InterruptedException { do { drainReprocessQueue(); drainAcceptorQueue(); if (!isShutdown.get()) { // If all queues are empty, block for a while on the acceptor queue if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) { TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS); if (taskHolder != null) { appendTaskHolder(taskHolder); } } } } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty()); } private void drainAcceptorQueue() { while (!acceptorQueue.isEmpty()) { appendTaskHolder(acceptorQueue.poll()); } } private void drainReprocessQueue() { long now = System.currentTimeMillis(); while (!reprocessQueue.isEmpty() && !isFull()) { TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast(); ID id = taskHolder.getId(); if (taskHolder.getExpiryTime() <= now) { expiredTasks++; } else if (pendingTasks.containsKey(id)) { overriddenTasks++; } else { pendingTasks.put(id, taskHolder); processingOrder.addFirst(id); } } if (isFull()) { queueOverflows += reprocessQueue.size(); reprocessQueue.clear(); } } private void appendTaskHolder(TaskHolder<ID, T> taskHolder) { if (isFull()) { pendingTasks.remove(processingOrder.poll()); queueOverflows++; } TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder); if (previousTask == null) { processingOrder.add(taskHolder.getId()); } else { overriddenTasks++; } }
然後判斷傳輸延遲等耗時開始分配任務,批量或單量,剛開始的時候還達不到批量處理任務的門檻,所以也就不會執行下面的程式assignBatchWork
void assignBatchWork() { if (hasEnoughTasksForNextBatch()) { if (batchWorkRequests.tryAcquire(1)) { long now = System.currentTimeMillis(); int len = Math.min(maxBatchingSize, processingOrder.size()); List<TaskHolder<ID, T>> holders = new ArrayList<>(len); while (holders.size() < len && !processingOrder.isEmpty()) { ID id = processingOrder.poll(); TaskHolder<ID, T> holder = pendingTasks.remove(id); if (holder.getExpiryTime() > now) { holders.add(holder); } else { expiredTasks++; } } if (holders.isEmpty()) { batchWorkRequests.release(); } else { batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS); batchWorkQueue.add(holders); } } } } private boolean hasEnoughTasksForNextBatch() { if (processingOrder.isEmpty()) { return false; } if (pendingTasks.size() >= maxBufferSize) { return true; } TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek()); long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp(); return delay >= maxBatchingDelay; }
這個時候會執行assignSingleItemWork,這個地方有個小設計Semaphore,他的初始值是0,所以下面這個程式是不會進入的,所以批量的任務也就不會被放進單次佇列中,
void assignSingleItemWork() {
if (!processingOrder.isEmpty()) {
if (singleItemWorkRequests.tryAcquire(1)) {
long now = System.currentTimeMillis();
while (!processingOrder.isEmpty()) {
ID id = processingOrder.poll();
TaskHolder<ID, T> holder = pendingTasks.remove(id);
if (holder.getExpiryTime() > now) {
singleItemWorkQueue.add(holder);
return;
}
expiredTasks++;
}
singleItemWorkRequests.release();
}
}
}
這個訊號量Semaphore的可用值是從當批量任務執行器BatchWorkerRunnable需要執行任務的時候給設定的。
BlockingQueue<List<TaskHolder<ID, T>>> requestWorkItems() {
batchWorkRequests.release();
return batchWorkQueue;
}
BatchWorkerRunnable#run,獲取任務列表,放進ReplicationTaskProcessor中執行
public void run() {
try {
while (!isShutdown.get()) {
List<TaskHolder<ID, T>> holders = getWork();
metrics.registerExpiryTimes(holders);
List<T> tasks = getTasksOf(holders);
ProcessingResult result = processor.process(tasks);
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
taskDispatcher.reprocess(holders, result);
break;
case PermanentError:
logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
}
metrics.registerTaskResult(result, tasks.size());
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery WorkerThread error", e);
}
}
private List<TaskHolder<ID, T>> getWork() throws InterruptedException {
BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems();
List<TaskHolder<ID, T>> result;
do {
result = workQueue.poll(1, TimeUnit.SECONDS);
} while (!isShutdown.get() && result == null);
return (result == null) ? new ArrayList<>() : result;
}
private List<T> getTasksOf(List<TaskHolder<ID, T>> holders) {
List<T> tasks = new ArrayList<>(holders.size());
for (TaskHolder<ID, T> holder : holders) {
tasks.add(holder.getTask());
}
return tasks;
}
用http同步客戶端請求各個服務節點同步客戶端例項資訊。介面為"peerreplication/batch/"
public ProcessingResult process(List<ReplicationTask> tasks) {
ReplicationList list = createReplicationListOf(tasks);
try {
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
int statusCode = response.getStatusCode();
if (!isSuccess(statusCode)) {
if (statusCode == 503) {
logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
return ProcessingResult.Congestion;
} else {
// Unexpected error returned from the server. This should ideally never happen.
logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
return ProcessingResult.PermanentError;
}
} else {
handleBatchResponse(tasks, response.getEntity().getResponseList());
}
} catch (Throwable e) {
if (maybeReadTimeOut(e)) {
logger.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);
//read timeout exception is more Congestion then TransientError, return Congestion for longer delay
return ProcessingResult.Congestion;
} else if (isNetworkConnectException(e)) {
logNetworkErrorSample(null, e);
return ProcessingResult.TransientError;
} else {
logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
return ProcessingResult.PermanentError;
}
}
return ProcessingResult.Success;
}
5. 被同步資訊的其他服務端節點接收同步訊息PeerReplicationResource#batchReplication,因為isReplication未true,所以在本節點處理完請求後也就不會再把該訊息同步給其他節點了。
private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
return new Builder().setStatusCode(Status.OK.getStatusCode());
}
相關推薦
Eureka註冊流程(2)
4. AcceptorExecutor#AcceptorRunner執行緒和TaskExecutors#BatchWorkerRunnable執行緒在初始化時作為守護執行緒啟動,這裡處理請求有批量和單個,但是共用一個AcceptorExecutor,只是引數maxBatchi
Eureka註冊流程(1)
1. 客戶端發起註冊本機資訊請求JerseyApplicationClient父類AbstractJerseyEurekaHttpClient#register public EurekaHttpResponse<Void> register(InstanceI
Django流程(2)
切換到建立專案的目錄 cd C:\Users\admin\Desktop\DjangoProject 建立名為project的專案命令 django-admin startproject project 注:所有路徑不要有中文 切換到目錄cd C:\U
Spring Cloud Eureka(服務治理)(2)
1.服務發現與消費 下面來嘗試構建一個服務消費者,它主要完成兩個目標,發現服務以及消費服務。其中服務發現的任務由Eureka的客戶端完成,而服務消費者的任務由Ribbon完成。Ribbon是一個基於HTTP和TCP的客戶端負載均衡器,它可以在通過客戶端中配置的ribbonServerList
django基本流程(2)
十、檢視的基本使用 10.1、在django中,檢視對web請求進行迴應,檢視就是一個函式,在view.py檔案中定義 10.2、定義檢視 from django.http import HttpResponse # Create your views here. def index
【Python web 開發】使用者註冊功能(2)
我們繼續來完善使用者的註冊功能 註冊的時候前端post過來 手機號,驗證碼,密碼,而密碼和驗證碼我們只需要在serialzers 裡面驗證用,不需要序列化返回給前端 正常的serialzers 是怎樣的邏輯呢? 我們來看下CreateModelMixin 的原始碼
SpringCloud入門教程之Eureka註冊中心(二)
學習SpringCloud技術前提就是學習Eureka註冊服務中心,而Eureka註冊服務中心,它是什麼呢?今天小編就帶你一起了解一下吧!!! Eureka 1.認識Eureka 2.原理圖 3.入門案例 1.認識
spring-cloud學習筆記Eureka註冊中心(四)修改成IP顯示
修改配置類 eureka: instance: #使用IP訪問註冊中心 prefer-ip-address: true #在註冊中心status的時候顯示的格式,這裡是 ip:埠 instance-id: ${spring.cloud.c
「小程式JAVA實戰」微信小程式的簡要註冊流程(二)
轉自:https://idig8.com/2018/08/09/xiaochengxu-chuji-02/ 瞭解了小程式的歷史和它未來的前景,我們開始註冊小程式 註冊小程式 可以參考官網介紹:https://developers.weixin.qq.com/miniprogram/int
Spring-cloud微服務實戰【三】:eureka註冊中心(中)
回憶一下,在上一篇文章中,我們建立了兩個springboot專案,並且在consumer專案中通過restTemplate進行HTTP通訊,成功訪問到了producer提供的介面,思考一下這樣的實現方式有什麼問題? 1.consumer必須知道producer的IP,才能呼叫對方的HTTP介面,並且在
Spring-cloud微服務實戰【四】:eureka註冊中心(下)
回憶一下,在上一篇文章中,我們使用eureka作為註冊中心,將producer註冊到eureka,並且在consumer中通過eureka發現producer服務進行呼叫,讓我們來分析一下,這樣是否已經足夠完美,還有沒有什麼問題? 1.首先,eureka沒有任何安全驗證,任何應用都可以訪問,這顯然不安全,因此
Spring Coud 2.0 Client 使用 https 註冊到 eureka server 中 (一)
使用Spring Cloud 元件搭建微服務時,預設情況下,eureka server 與 eureka client 之間的註冊與通訊都是 通過 http 方式,為了使交換更加安全,需要調整為Https,在這前大家需要自己百度一下HTTPS工作原理,這裡就不介紹了。
spring cloud 學習(2)核心: Eureka: 提供服務註冊和發現 註冊中心,負載均衡,故障轉移
官方描述:雲端服務發現,一個基於 REST 的服務,用於定位服務,以實現雲端中間層服務發現和故障轉移 Eureka 一個服務中心,一個註冊中心,將可以提供的服務都註冊到這個元件中, ,其他呼叫者需要的時候去註冊中心去獲取,然後再呼叫,避免了服務間的直接呼叫,實現了服務間的解耦
SpringCloud 原始碼系列(2)—— 註冊中心 Eureka(中)
五、服務註冊 1、例項資訊註冊器初始化 服務註冊的程式碼位置不容易發現,我們看 DiscoveryClient 初始化排程任務的這個方法,這段程式碼會去初始化一個例項資訊複製器 InstanceInfoReplicator,這個複製器就包含了例項的註冊(明明是註冊卻叫 Replicator 感覺怪怪的)。 ①
註冊登錄判斷---前臺代碼(2)
bsp ajax set hold com alert sage label tab 登錄代碼 1 <!DOCTYPE html> 2 <html lang="en"> 3 <head> 4 <meta chars
java基礎筆記(2)----流程控制
特性 byte 增加 基本 執行 size 判斷 efault 跳轉 java流程控制結構包括順序結構,分支結構,循環結構。 順序結構: 程序從上到下依次執行,中間沒有任何判斷和跳轉。 代碼如下: package c
DC學習(2)綜合的流程
模型 文本編輯器 步驟 div gui 其他 流程 http 生成 一:邏輯綜合的概述 synthesis = translation + logic optimization + gate mapping 1:Translation 主要把描述RTL級的HDL語言
Spring基礎:快速入門spring cloud(2):服務發現之eureka
分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!  
Linux驅動開發(2)——設備註冊
結構體platform_device 註冊裝置使用結構體platform_device,原始碼路徑include/linux/platform_device.h struct platform_device { const char * name;//裝
spring cloud學習筆記(2)-- Eureka
傳統單體架構介紹及優缺點 一個專案包(war包,歸檔包)包含了應用的所有功能, 在沒有出現微服務概念之前,基本上都是這種架構形式存在, 我們一般把程式打包成一個檔案後,扔到tomcat或者jetty, jboss等應用伺服器中即可 特點: 部署很簡單,符合我們的思維;專案