1. 程式人生 > >Eureka註冊流程(2)

Eureka註冊流程(2)

4. AcceptorExecutor#AcceptorRunner執行緒和TaskExecutors#BatchWorkerRunnable執行緒在初始化時作為守護執行緒啟動,這裡處理請求有批量和單個,但是共用一個AcceptorExecutor,只是引數maxBatchingSize不一樣,AcceptorRunner#run,先判斷執行緒有沒有被強制停止,

 public void run() {
            long scheduleTime = 0;
            while (!isShutdown.get()) {
                try {
                    drainInputQueues();

                    int totalItems = processingOrder.size();

                    long now = System.currentTimeMillis();
                    if (scheduleTime < now) {
                        scheduleTime = now + trafficShaper.transmissionDelay();
                    }
                    if (scheduleTime <= now) {
                        assignBatchWork();
                        assignSingleItemWork();
                    }

                    // If no worker is requesting data or there is a delay injected by the traffic shaper,
                    // sleep for some time to avoid tight loop.
                    if (totalItems == processingOrder.size()) {
                        Thread.sleep(10);
                    }
                } catch (InterruptedException ex) {
                    // Ignore
                } catch (Throwable e) {
                    // Safe-guard, so we never exit this loop in an uncontrolled way.
                    logger.warn("Discovery AcceptorThread error", e);
                }
            }
        }

然後通過drainAcceptorQueue方法把acceptorQueue佇列中的任務取出來放入pendingTasks和processingOrder,代表的意思就是把需要執行的任務都放入待執行任務佇列中。

private boolean isFull() {
            return pendingTasks.size() >= maxBufferSize;
        }

private void drainInputQueues() throws InterruptedException {
            do {
                drainReprocessQueue();
                drainAcceptorQueue();

                if (!isShutdown.get()) {
                    // If all queues are empty, block for a while on the acceptor queue
                    if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
                        TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
                        if (taskHolder != null) {
                            appendTaskHolder(taskHolder);
                        }
                    }
                }
            } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
        }

        private void drainAcceptorQueue() {
            while (!acceptorQueue.isEmpty()) {
                appendTaskHolder(acceptorQueue.poll());
            }
        }

        private void drainReprocessQueue() {
            long now = System.currentTimeMillis();
            while (!reprocessQueue.isEmpty() && !isFull()) {
                TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast();
                ID id = taskHolder.getId();
                if (taskHolder.getExpiryTime() <= now) {
                    expiredTasks++;
                } else if (pendingTasks.containsKey(id)) {
                    overriddenTasks++;
                } else {
                    pendingTasks.put(id, taskHolder);
                    processingOrder.addFirst(id);
                }
            }
            if (isFull()) {
                queueOverflows += reprocessQueue.size();
                reprocessQueue.clear();
            }
        }

private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
            if (isFull()) {
                pendingTasks.remove(processingOrder.poll());
                queueOverflows++;
            }
            TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
            if (previousTask == null) {
                processingOrder.add(taskHolder.getId());
            } else {
                overriddenTasks++;
            }
        }

然後判斷傳輸延遲等耗時開始分配任務,批量或單量,剛開始的時候還達不到批量處理任務的門檻,所以也就不會執行下面的程式assignBatchWork

 void assignBatchWork() {
            if (hasEnoughTasksForNextBatch()) {
                if (batchWorkRequests.tryAcquire(1)) {
                    long now = System.currentTimeMillis();
                    int len = Math.min(maxBatchingSize, processingOrder.size());
                    List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
                    while (holders.size() < len && !processingOrder.isEmpty()) {
                        ID id = processingOrder.poll();
                        TaskHolder<ID, T> holder = pendingTasks.remove(id);
                        if (holder.getExpiryTime() > now) {
                            holders.add(holder);
                        } else {
                            expiredTasks++;
                        }
                    }
                    if (holders.isEmpty()) {
                        batchWorkRequests.release();
                    } else {
                        batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
                        batchWorkQueue.add(holders);
                    }
                }
            }
        }

        private boolean hasEnoughTasksForNextBatch() {
            if (processingOrder.isEmpty()) {
                return false;
            }
            if (pendingTasks.size() >= maxBufferSize) {
                return true;
            }

            TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
            long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
            return delay >= maxBatchingDelay;
        }

這個時候會執行assignSingleItemWork,這個地方有個小設計Semaphore,他的初始值是0,所以下面這個程式是不會進入的,所以批量的任務也就不會被放進單次佇列中,

 void assignSingleItemWork() {
            if (!processingOrder.isEmpty()) {
                if (singleItemWorkRequests.tryAcquire(1)) {
                    long now = System.currentTimeMillis();
                    while (!processingOrder.isEmpty()) {
                        ID id = processingOrder.poll();
                        TaskHolder<ID, T> holder = pendingTasks.remove(id);
                        if (holder.getExpiryTime() > now) {
                            singleItemWorkQueue.add(holder);
                            return;
                        }
                        expiredTasks++;
                    }
                    singleItemWorkRequests.release();
                }
            }
        }

這個訊號量Semaphore的可用值是從當批量任務執行器BatchWorkerRunnable需要執行任務的時候給設定的。

 BlockingQueue<List<TaskHolder<ID, T>>> requestWorkItems() {
        batchWorkRequests.release();
        return batchWorkQueue;
    }

BatchWorkerRunnable#run,獲取任務列表,放進ReplicationTaskProcessor中執行

public void run() {
            try {
                while (!isShutdown.get()) {
                    List<TaskHolder<ID, T>> holders = getWork();
                    metrics.registerExpiryTimes(holders);

                    List<T> tasks = getTasksOf(holders);
                    ProcessingResult result = processor.process(tasks);
                    switch (result) {
                        case Success:
                            break;
                        case Congestion:
                        case TransientError:
                            taskDispatcher.reprocess(holders, result);
                            break;
                        case PermanentError:
                            logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
                    }
                    metrics.registerTaskResult(result, tasks.size());
                }
            } catch (InterruptedException e) {
                // Ignore
            } catch (Throwable e) {
                // Safe-guard, so we never exit this loop in an uncontrolled way.
                logger.warn("Discovery WorkerThread error", e);
            }
        }

        private List<TaskHolder<ID, T>> getWork() throws InterruptedException {
            BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems();
            List<TaskHolder<ID, T>> result;
            do {
                result = workQueue.poll(1, TimeUnit.SECONDS);
            } while (!isShutdown.get() && result == null);
            return (result == null) ? new ArrayList<>() : result;
        }

        private List<T> getTasksOf(List<TaskHolder<ID, T>> holders) {
            List<T> tasks = new ArrayList<>(holders.size());
            for (TaskHolder<ID, T> holder : holders) {
                tasks.add(holder.getTask());
            }
            return tasks;
        }

用http同步客戶端請求各個服務節點同步客戶端例項資訊。介面為"peerreplication/batch/"

 public ProcessingResult process(List<ReplicationTask> tasks) {
        ReplicationList list = createReplicationListOf(tasks);
        try {
            EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
            int statusCode = response.getStatusCode();
            if (!isSuccess(statusCode)) {
                if (statusCode == 503) {
                    logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
                    return ProcessingResult.Congestion;
                } else {
                    // Unexpected error returned from the server. This should ideally never happen.
                    logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
                    return ProcessingResult.PermanentError;
                }
            } else {
                handleBatchResponse(tasks, response.getEntity().getResponseList());
            }
        } catch (Throwable e) {
            if (maybeReadTimeOut(e)) {
                logger.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);
            	//read timeout exception is more Congestion then TransientError, return Congestion for longer delay 
                return ProcessingResult.Congestion;
            } else if (isNetworkConnectException(e)) {
                logNetworkErrorSample(null, e);
                return ProcessingResult.TransientError;
            } else {
                logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
                return ProcessingResult.PermanentError;
            }
        }
        return ProcessingResult.Success;
    }

5. 被同步資訊的其他服務端節點接收同步訊息PeerReplicationResource#batchReplication,因為isReplication未true,所以在本節點處理完請求後也就不會再把該訊息同步給其他節點了。

private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
        applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
        return new Builder().setStatusCode(Status.OK.getStatusCode());
    }

相關推薦

Eureka註冊流程2

4. AcceptorExecutor#AcceptorRunner執行緒和TaskExecutors#BatchWorkerRunnable執行緒在初始化時作為守護執行緒啟動,這裡處理請求有批量和單個,但是共用一個AcceptorExecutor,只是引數maxBatchi

Eureka註冊流程1

1. 客戶端發起註冊本機資訊請求JerseyApplicationClient父類AbstractJerseyEurekaHttpClient#register public EurekaHttpResponse<Void> register(InstanceI

Django流程2

切換到建立專案的目錄 cd C:\Users\admin\Desktop\DjangoProject 建立名為project的專案命令 django-admin startproject project 注:所有路徑不要有中文 切換到目錄cd C:\U

Spring Cloud Eureka(服務治理)2

1.服務發現與消費 下面來嘗試構建一個服務消費者,它主要完成兩個目標,發現服務以及消費服務。其中服務發現的任務由Eureka的客戶端完成,而服務消費者的任務由Ribbon完成。Ribbon是一個基於HTTP和TCP的客戶端負載均衡器,它可以在通過客戶端中配置的ribbonServerList

django基本流程2

十、檢視的基本使用 10.1、在django中,檢視對web請求進行迴應,檢視就是一個函式,在view.py檔案中定義 10.2、定義檢視 from django.http import HttpResponse # Create your views here. def index

【Python web 開發】使用者註冊功能2

我們繼續來完善使用者的註冊功能 註冊的時候前端post過來 手機號,驗證碼,密碼,而密碼和驗證碼我們只需要在serialzers 裡面驗證用,不需要序列化返回給前端 正常的serialzers  是怎樣的邏輯呢? 我們來看下CreateModelMixin 的原始碼  

SpringCloud入門教程之Eureka註冊中心

學習SpringCloud技術前提就是學習Eureka註冊服務中心,而Eureka註冊服務中心,它是什麼呢?今天小編就帶你一起了解一下吧!!! Eureka 1.認識Eureka 2.原理圖 3.入門案例 1.認識

spring-cloud學習筆記Eureka註冊中心修改成IP顯示

修改配置類 eureka: instance: #使用IP訪問註冊中心 prefer-ip-address: true #在註冊中心status的時候顯示的格式,這裡是 ip:埠 instance-id: ${spring.cloud.c

「小程式JAVA實戰」微信小程式的簡要註冊流程

轉自:https://idig8.com/2018/08/09/xiaochengxu-chuji-02/ 瞭解了小程式的歷史和它未來的前景,我們開始註冊小程式 註冊小程式 可以參考官網介紹:https://developers.weixin.qq.com/miniprogram/int

Spring-cloud微服務實戰【三】:eureka註冊中心

  回憶一下,在上一篇文章中,我們建立了兩個springboot專案,並且在consumer專案中通過restTemplate進行HTTP通訊,成功訪問到了producer提供的介面,思考一下這樣的實現方式有什麼問題?   1.consumer必須知道producer的IP,才能呼叫對方的HTTP介面,並且在

Spring-cloud微服務實戰【四】:eureka註冊中心

回憶一下,在上一篇文章中,我們使用eureka作為註冊中心,將producer註冊到eureka,並且在consumer中通過eureka發現producer服務進行呼叫,讓我們來分析一下,這樣是否已經足夠完美,還有沒有什麼問題? 1.首先,eureka沒有任何安全驗證,任何應用都可以訪問,這顯然不安全,因此

Spring Coud 2.0 Client 使用 https 註冊eureka server 中

使用Spring Cloud 元件搭建微服務時,預設情況下,eureka server 與 eureka client 之間的註冊與通訊都是 通過 http 方式,為了使交換更加安全,需要調整為Https,在這前大家需要自己百度一下HTTPS工作原理,這裡就不介紹了。

spring cloud 學習2核心: Eureka: 提供服務註冊和發現 註冊中心,負載均衡,故障轉移

官方描述:雲端服務發現,一個基於 REST 的服務,用於定位服務,以實現雲端中間層服務發現和故障轉移 Eureka 一個服務中心,一個註冊中心,將可以提供的服務都註冊到這個元件中, ,其他呼叫者需要的時候去註冊中心去獲取,然後再呼叫,避免了服務間的直接呼叫,實現了服務間的解耦

SpringCloud 原始碼系列2—— 註冊中心 Eureka

五、服務註冊 1、例項資訊註冊器初始化 服務註冊的程式碼位置不容易發現,我們看 DiscoveryClient 初始化排程任務的這個方法,這段程式碼會去初始化一個例項資訊複製器 InstanceInfoReplicator,這個複製器就包含了例項的註冊(明明是註冊卻叫 Replicator 感覺怪怪的)。 ①

註冊登錄判斷---前臺代碼2

bsp ajax set hold com alert sage label tab 登錄代碼 1 <!DOCTYPE html> 2 <html lang="en"> 3 <head> 4 <meta chars

java基礎筆記2----流程控制

特性 byte 增加 基本 執行 size 判斷 efault 跳轉 java流程控制結構包括順序結構,分支結構,循環結構。 順序結構: 程序從上到下依次執行,中間沒有任何判斷和跳轉。 代碼如下: package c

DC學習2綜合的流程

模型 文本編輯器 步驟 div gui 其他 流程 http 生成 一:邏輯綜合的概述   synthesis = translation + logic optimization + gate mapping 1:Translation   主要把描述RTL級的HDL語言

Spring基礎:快速入門spring cloud2:服務發現之eureka

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

Linux驅動開發2——設備註冊

結構體platform_device 註冊裝置使用結構體platform_device,原始碼路徑include/linux/platform_device.h struct platform_device { const char * name;//裝

spring cloud學習筆記2-- Eureka

傳統單體架構介紹及優缺點 一個專案包(war包,歸檔包)包含了應用的所有功能, 在沒有出現微服務概念之前,基本上都是這種架構形式存在, 我們一般把程式打包成一個檔案後,扔到tomcat或者jetty, jboss等應用伺服器中即可 特點: 部署很簡單,符合我們的思維;專案