1. 程式人生 > >Eureka高可用之Client重試機制:RetryableEurekaHttpClient

Eureka高可用之Client重試機制:RetryableEurekaHttpClient

下面有幾個疑問是我看原始碼時問自己的,先提出來,希望看這篇文章的人帶著疑問去讀,然後初步介紹下EurekaHttpClient體系,後面會詳細講RetryableEurekaHttpClient

1、Eureka Client如何向Eureka Server叢集註冊?如果我的Client端的ServiceUrl配置了多個Eureka Service地址,那麼Client是否會向每一個Server發起註冊?

2、Eureka Server具有複製行為,即向其他Eureka Server節點複製自身的例項資訊,那麼既然有複製行為,那麼Eureka Client的ServiceUrl中只配置一個不就行了嗎,我註冊到Server上,Server自己去把我的資訊複製為其他Eureka Server節點不就好了嗎,是否就說明Eureka Client的ServiceUrl只配置一個就好?

3、如果Eureka Client的ServiceUrl配置了多個,那麼Client會和那個Eureka Server保持通訊(註冊、續約心跳等)?是否是第一個,或者是隨機的?

defaultZone: http://server3:50220/eureka,http://server1:50100/eureka,http://server2:50300/eureka

RetryableEurekaHttpClient繼承自EurekaHttpClient裝飾器EurekaHttpClientDecorator,它並不是真正發起http請求的HttpClient,它會間接的把請求委託給AbstractJerseyEurekaHttpClient,如下類圖:

EurekaHttpClientDecorator這個類採用了模板方法模式,在register、cancel、sendHeartBeat等行為中,抽象出了execute方法,讓子類自定義執行行為

//匿名介面,沒有具體實現類
public interface RequestExecutor<R> {
    EurekaHttpResponse<R> execute(EurekaHttpClient delegate);
    RequestType getRequestType();
}

//採用模板方法設計模式,從register、cancel、sendHeartBeat等行為中抽象出execute行為,讓子類自己去定義具體實現
protected abstract <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor);

下面只列出register方法

@Override
public EurekaHttpResponse<Void> register(final InstanceInfo info) {
    //匿名介面的實現,呼叫子類的execute方法
    return execute(new RequestExecutor<Void>() {
        @Override
        public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {
            //一步一步委託,最後委託給AbstractJerseyEurekaHttpClient
            return delegate.register(info);
        }

        @Override
        public RequestType getRequestType() {
            return RequestType.Register;
        }
    });
}

這篇文章主要介紹RetryableEurekaHttpClient,算是比較重要的,後面有時間再去寫其他幾個。

顧名思義,可重試的HttpClient,那麼這個類中一定會有重試機制的實現,我們先來看看它的execute(RequestExecutor<R> requestExecutor)方法,我們看到這個方法裡面有個for迴圈,並且在迴圈內發起了http請求,這個迴圈預設的重試次數為3(沒有配置可以配置這個次數)。

這個迴圈裡面有一個getHostCandidates()方法,獲取所有可用的Eureka Server端點,然後通過endpointIdx++遍歷Eureka Server端點發送http請求,如果請求過程中出現超時等異常,注意catch程式碼塊中並沒有丟擲異常,而是記錄日誌,然後將這個超時的Eureka Server端點加入黑名單quarantineSet中,繼續進行for迴圈。

異常處理是重試機制重要的一環,如果這個地方沒有try catch或者直接丟擲異常,那麼比如有三個serviceUrl,某一時間在向server3發起請求的時候出現異常,即便後面兩個server1和server2是可用的,也不會去請求了(丟擲異常,後面for迴圈程式碼不會執行了)。

那麼由於numberOfRetries等於3,也就是說,最多重試三次,如果都不成功,即便第四個serviceUrl是可用的,也不會去嘗試了。

@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
    List<EurekaEndpoint> candidateHosts = null;
    //候選Eureka ServerList的下標
    int endpointIdx = 0;
    //預設重試3次,DEFAULT_NUMBER_OF_RETRIES = 3
    for (int retry = 0; retry < numberOfRetries; retry++) {
        EurekaHttpClient currentHttpClient = delegate.get();
        EurekaEndpoint currentEndpoint = null;
        if (currentHttpClient == null) {
            if (candidateHosts == null) {
                //獲取候選Eureka Server 的serviceUrlList
                candidateHosts = getHostCandidates();
                if (candidateHosts.isEmpty()) {
                    //如果出現這個異常,基本山可以肯定的是,沒有配置serviceUrl和remoteRegion
                    throw new TransportException("There is no known eureka server; cluster server list is empty");
                }
            }
            if (endpointIdx >= candidateHosts.size()) {
                // 這個異常也很常見,這個方法裡面的迴圈預設要執行三次,當你只配了一個ServiceUrl,
                // 並且是無效的,那麼在第二次重試的時候,就會丟擲這個異常
                throw new TransportException("Cannot execute request on any known server");
            }
            //獲取serviceUrl資訊
            currentEndpoint = candidateHosts.get(endpointIdx++);
            //根據新的serviceUrl資訊構建新的httpClient
            currentHttpClient = clientFactory.newClient(currentEndpoint);
        }

        try {
            //向serviceUrl發起請求,register、heartBeat、Cancel、statusUpdate。
            EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
            // serverStatusEvaluator為狀態評估器,為每個請求型別(Register、SendHeartBeat、Cancel、GetDelta等)
            // 設定可接受的狀態碼,比如,當請求型別為Register,且response.getStatusCode()為404,那麼此時也算可接受的
            // 不再去嘗試下一個ServiceURl
            if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
                delegate.set(currentHttpClient);
                if (retry > 0) {
                    logger.info("Request execution succeeded on retry #{}", retry);
                }
                return response;
            }
            logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
        } catch (Exception e) {
            //如果請求過程中,出現連線超時等異常,列印日誌,更新currentHttpClient,更換下一個serviceUrl重新嘗試
            logger.warn("Request execution failed with message: {}", e.getMessage());  // just log message as the underlying client should log the stacktrace
        }

        // Connection error or 5xx from the server that must be retried on another server
        delegate.compareAndSet(currentHttpClient, null);
        if (currentEndpoint != null) {
            //http請求失敗,將當前嘗試的Eureka Server端點放入黑名單。
            quarantineSet.add(currentEndpoint);
        }
    }
    //如果三次都沒有請求成功,則放棄請求,如果serviceUrl中配置了4個Eureka地址,前三個都請求失敗了,那麼即便第四個serviceUrl可用,也不會去嘗試
    throw new TransportException("Retry limit reached; giving up on completing the request");
}

那麼從上面的程式碼可以得出結論:

1、Eureka Client在傳送註冊、心跳等請求時,會向Eureka Server叢集節點serviceUrlList順序逐個去嘗試,如果有一個請求成功了,那麼直接返回response ,不再去向其他節點請求,最多隻重試3次,超過3次直接丟擲異常。

2、如果按如下配置defaultZone那麼請求的順序是server3->server1->server2

3、defaultZone建議配置多個url,有多少配置多少,即便大於3,因為有的server可能被client拉黑了,不會被client請求,也就不會計入numberOfRetries次數

4、如果下面這個配置中server3永遠可用,那麼這個Client永遠只向這一個server傳送心跳等事件

5、Eureka Client的defaultZone不要都配置成一樣的順序,最好打亂配置,如果所有的Eureka Client都按以下的配置,那麼這個server3的壓力很大,既要負責接收所有client的心跳狀態變更等,又要負責向其他server叢集節點同步資訊

defaultZone: http://server3:50220/eureka,http://server1:50100/eureka,http://server2:50300/eureka

getHostCandidates()這個方法是用來獲取ServiceUrlList的,它內部有個黑名單機制,如果向一個Eureka Server端點發起請求異常失敗,那麼就會把這個Eureka Server端點放入quarantineSet(隔離集合)裡面,下一次呼叫getHostCandidates()方法時候(在上面那個for迴圈裡面,這個方法只會執行一次),會拿quarantineSet.size()和一個閾值做比較,如果小於這個閾值,那麼就會對candidateHosts 進行過濾。

這個黑名單機制,其根本目的是為了讓Eureka Client請求成功的概率更大,想象一下,如果上面那個server3永遠掛了,而且還沒好辦法去動態改變Client的defaultZone的配置,那麼每30秒傳送一次心跳的時候,client都會去請求一下server3

private List<EurekaEndpoint> getHostCandidates() {
    //獲取所有的Eureka Server叢集節點
    List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();
    //黑名單取交集,看看candidateHosts裡面有幾個Server端點是在黑名單裡面的
    quarantineSet.retainAll(candidateHosts);

    // If enough hosts are bad, we have no choice but start over again
    //這個百分比預設是0.66,約等於2/3,
    //舉個栗子,如果candidateHosts=3,那麼閾值threshold就等於1(3*0.66=1.98,在int強轉型就等於1了...)
    int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());
    //Prevent threshold is too large
    if (threshold > candidateHosts.size()) {
        //防止閾值過大,這個百分比有可能被人誤設成大於1的值
        threshold = candidateHosts.size();
    }
    if (quarantineSet.isEmpty()) {
        //黑名單是空的,不進行過濾
        // no-op
    } else if (quarantineSet.size() >= threshold) {
        //黑名單的數量大於這個閾值了,清空黑名單,不進行過濾
        //設定閾值目的就是在於防止所有的serverlist都不可用,都被拉黑了
        //所以要清空黑名單,重新進行嘗試
        logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
        quarantineSet.clear();
    } else {
        //如果小於閾值,那麼過濾掉黑名單裡面的端點
        List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
        for (EurekaEndpoint endpoint : candidateHosts) {
            if (!quarantineSet.contains(endpoint)) {
                remainingHosts.add(endpoint);
            }
        }
        candidateHosts = remainingHosts;
    }

    return candidateHosts;
}

這個RetryableEurekaHttpClient的重試機制基本上就講差不多了,如果有想自己去除錯的同學,可以在Client端按照如下配置defaultZone,然後只開啟一臺Eureka Server(server2)然後在RetryableEurekaHttpClient類的execute方法裡面打上斷點,debug啟動Client即可(register-with-eureka和fetch-registry屬性一定要有一個配置成true

defaultZone: http://server3:50220/eureka,http://server1:50100/eureka,http://server2:50300/eureka