1. 程式人生 > >httpclient架構原理介紹 & 連線池詳解

httpclient架構原理介紹 & 連線池詳解

本篇重點介紹httpclient連線池的相關原理以及介紹,順帶的介紹httpclient傳送請求時的簡單介紹,並會帶上一些原始碼分析。本篇博文是基於httpclient的4.5.2版本進行介紹的

一、傳送請求的流程原理

幾個關鍵的類和介面介紹

在介紹架構原理前,先介紹幾個類和介面,方便讀者對httpclient的整體設計有個大概的概念。

HttpClient:一個介面,即http客戶端的抽象,主要就是用它傳送請求http請求。它的主要實現有CloseableHttpClient,相信讀者們比較熟悉。

HttpRequestBase:一個抽象類,是請求內容的抽象。包括了請求協議、uri、還有一些配置。我們常用的HttpGet和HttpPost都是它的子類。

HttpClientConnectionManager:一個介面,連線管理的抽象。一般要傳送http請求前,需要和目標服務建立連線,然後再發送資料包。這個連線管理器可以對連線以池的方式進行管理。

HttpRoute:一個final類,用來表示目標伺服器(ip+埠)。

傳送流程圖

這裡寫圖片描述

一個HttpRequestBase在被httpclient執行後,會經過一個鏈路被一個個元件處理。這裡使用了職責鏈的設計模式,一個元件處理完後,就會交給下一個元件處理。這樣做的好處就是如果要移除一個元件或者新增一個新的元件來實現對請求的一些處理非常方便。這裡要說一下,上圖列的元件中的一些是根據配置決定是否加入到該執行鏈中。

我們一般通過CloseableHttpClient httpClient = HttpClients.custom().build();獲取到一個httpClient。這裡返回的實際物件其實是InternalHttpClient類,所以執行httpclient.execute(request)時候最終會呼叫InternalHttpClient#doExecute()。我們看下對應的原始碼

protected CloseableHttpResponse doExecute(
            final HttpHost target,
            final HttpRequest request,
            final
HttpContext context) throws IOException, ClientProtocolException { Args.notNull(request, "HTTP request"); HttpExecutionAware execAware = null; if (request instanceof HttpExecutionAware) { execAware = (HttpExecutionAware) request; } try { final HttpRequestWrapper wrapper = HttpRequestWrapper.wrap(request, target); final HttpClientContext localcontext = HttpClientContext.adapt( context != null ? context : new BasicHttpContext()); RequestConfig config = null; if (request instanceof Configurable) { config = ((Configurable) request).getConfig(); } if (config == null) { final HttpParams params = request.getParams(); if (params instanceof HttpParamsNames) { if (!((HttpParamsNames) params).getNames().isEmpty()) { config = HttpClientParamConfig.getRequestConfig(params); } } else { config = HttpClientParamConfig.getRequestConfig(params); } } if (config != null) { localcontext.setRequestConfig(config); } setupContext(localcontext); final HttpRoute route = determineRoute(target, wrapper, localcontext); return this.execChain.execute(route, wrapper, localcontext, execAware); } catch (final HttpException httpException) { throw new ClientProtocolException(httpException); } }

通過程式碼可以看到,這裡主要是做兩件事

  1. 獲取requestConfig,然後設定到請求上下文中
  2. 通過請求獲取對應的目標伺服器HttpRoute

之後就交給處理鏈處理請求。

處理鏈的構造是在InternalHttpClient的構造中完成的,也就是HttpClients.custom().build()方法中構造起來的。我們看下處理鏈的構造程式碼

    public CloseableHttpClient build() {
        ...
        ClientExecChain execChain = createMainExec(
                requestExecCopy,
                connManagerCopy,
                reuseStrategyCopy,
                keepAliveStrategyCopy,
                new ImmutableHttpProcessor(new RequestTargetHost(), new RequestUserAgent(userAgentCopy)),
                targetAuthStrategyCopy,
                proxyAuthStrategyCopy,
                userTokenHandlerCopy);

        execChain = decorateMainExec(execChain);

        ...
        execChain = new ProtocolExec(execChain, httpprocessorCopy);

        execChain = decorateProtocolExec(execChain);

        // Add request retry executor, if not disabled
        if (!automaticRetriesDisabled) {
            HttpRequestRetryHandler retryHandlerCopy = this.retryHandler;
            if (retryHandlerCopy == null) {
                retryHandlerCopy = DefaultHttpRequestRetryHandler.INSTANCE;
            }
            execChain = new RetryExec(execChain, retryHandlerCopy);
        }

        HttpRoutePlanner routePlannerCopy = this.routePlanner;
        if (routePlannerCopy == null) {
            SchemePortResolver schemePortResolverCopy = this.schemePortResolver;
            if (schemePortResolverCopy == null) {
                schemePortResolverCopy = DefaultSchemePortResolver.INSTANCE;
            }
            if (proxy != null) {
                routePlannerCopy = new DefaultProxyRoutePlanner(proxy, schemePortResolverCopy);
            } else if (systemProperties) {
                routePlannerCopy = new SystemDefaultRoutePlanner(
                        schemePortResolverCopy, ProxySelector.getDefault());
            } else {
                routePlannerCopy = new DefaultRoutePlanner(schemePortResolverCopy);
            }
        }
        // Add redirect executor, if not disabled
        if (!redirectHandlingDisabled) {
            RedirectStrategy redirectStrategyCopy = this.redirectStrategy;
            if (redirectStrategyCopy == null) {
                redirectStrategyCopy = DefaultRedirectStrategy.INSTANCE;
            }
            execChain = new RedirectExec(execChain, routePlannerCopy, redirectStrategyCopy);
        }

        // Optionally, add service unavailable retry executor
        final ServiceUnavailableRetryStrategy serviceUnavailStrategyCopy = this.serviceUnavailStrategy;
        if (serviceUnavailStrategyCopy != null) {
            execChain = new ServiceUnavailableRetryExec(execChain, serviceUnavailStrategyCopy);
        }
        // Optionally, add connection back-off executor
        if (this.backoffManager != null && this.connectionBackoffStrategy != null) {
            execChain = new BackoffStrategyExec(execChain, this.connectionBackoffStrategy, this.backoffManager);
        }
        ...
        return new InternalHttpClient(
                execChain,
                connManagerCopy,
                routePlannerCopy,
                cookieSpecRegistryCopy,
                authSchemeRegistryCopy,
                defaultCookieStore,
                defaultCredentialsProvider,
                defaultRequestConfig != null ? defaultRequestConfig : RequestConfig.DEFAULT,
                closeablesCopy);
    }

由於這個方法太長,所以只保留了處理鏈的那部分程式碼。

下面介紹處理鏈中各個元件的一個大概功能:

  1. MainClientExec:主要執行客戶端請求的,通過連線管理器,來把請求繫結到具體的連線上面,接著傳送請求。同時也是在這個元件裡面做連線的池化處理等。
  2. ProtocolExec:通過一系列的HttpProcessor處理鏈對Http訊息按格式編碼以及解碼。每一個processor處理一個範疇的事情,比如處理header,content以及cookie等等。我們可以往HttpRequestInterceptor和HttpResponseInterceptor中新增我們自己定義的攔截器。這樣,HttpProcessor在處理請求和響應前,就會經過我們設定的攔截器進行相應的操作。
  3. RetryExec:進行重連操作。是否要重連的判斷的根據配置的HttpRequestRetryHandler。
  4. RedirectExec:處理重定向的情況
  5. ServiceUnavailableRetryExec:返回503進行重試
  6. BackoffStrategyExec:對出現連線或者響應超時異常的route進行降級,縮小該route上連線數,能使得服務質量更好的route能得到更多的連線。降級的速度可以通過因子設定,預設是每次降級減少一半的連線數,即降級因子是0.5。

上圖中的HttpRequestExecutor只是MainClientExec中的元件,用於真正的傳送http請求給目標伺服器。

二、連線池的管理

通過下面這段程式碼,我們可以給httpclient設定連線管理器

    private static CloseableHttpClient createHttpClient() {
        PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
        // 將最大連線數新增
        cm.setMaxTotal(200);
        // 將每一個路由基礎的連線新增
        cm.setDefaultMaxPerRoute(40);
        HttpHost httpHost = new HttpHost("www.baidu.com", 80);
        HttpRoute httpRoute = new HttpRoute(httpHost);
        cm.setMaxPerRoute(httpRoute, 80);

        HttpRequestRetryHandler httpRetryHandler = new DefaultHttpRequestRetryHandler(5, false);

        //預設連線配置
        RequestConfig defaultRequestConfig = RequestConfig.custom()
                .setConnectionRequestTimeout(10000)
                .setConnectTimeout(10000).setSocketTimeout(10000).build();

        return HttpClients.custom()
                .setDefaultRequestConfig(defaultRequestConfig)
                .evictExpiredConnections()
                .evictIdleConnections(10, TimeUnit.SECONDS)
                .setConnectionManager(cm)
                .setRetryHandler(httpRetryHandler).build();
    }

在上面的例子中,我們初始化了一個基於連線池的連線管理器。這個連線池中最多持有200個連線,每個目標伺服器最多持有40個連線。其中,我們專門設定了www.baidu.com:80的目標伺服器的可以持有的最大連線數是80個。

如果我們不給httpclient配置指定的連線管理器,在預設情況下,httpclient也會自動使用PoolingHttpClientConnectionManager作為連線管理器。但是PoolingHttpClientConnectionManager預設的maxConnPerRoute和maxConnTotal分別是是2和20。也就是對於每個伺服器最多隻會維護2個連線,看起來有點少。所以,在日常使用時我們儘量使用自己配置的連線管理器比較好。

a. 連線池結構如下

連線池管理器會為每個httpRoute單獨的維護一個連線池。管理著available、leased、pending這些物件。同時,CPool本身也會維護一個總的available、leased、pending物件,是那些routeToPool中available、leased、pending的總和。這裡介紹一下上圖的幾個概念

1. PoolEntity

2. LinkedList available – 存放可用連線

available 表示可用的連線,他們是用LinkedList來存放的。

當連線被使用完後,會被放入連結串列的頭部。同時,當需要取連線時,也是從連結串列的頭部開始遍歷,直到獲取可用的連線為止。這樣做的目的是為了儘快的獲取到可用的連線,因為在連結串列頭部的都是剛放入連結串列的連線,離過期時間肯定是最遠的。如果從連結串列尾部獲取的話,那麼很可能會獲取到失效的連線。

同時,刪除連結串列的失效連線時從連結串列尾部開始遍歷的。

3. HashSet leased – 存放被租用的連線

leased存放正在被使用的連線。如果一個連線被建立或者從available 連結串列中取出,就會先放入leased集合中。同時,用完連線後,就從leased集合中移除掉掉。因為就add和remove操作,所以使用HashSet的資料結構,效率很高。

maxTotal的配置就是available連結串列和leased集合的總和限制。

4. LinkedList pending – 存放等待獲取連線的執行緒的Future

當從池中獲取連線時,如果available連結串列沒有現成可用的連線,且當前路由或連線池已經達到了最大數量的限制,也不能建立連線了,此時不會阻塞整個連線池,而是將當前執行緒用於獲取連線的Future放入pending連結串列的末尾,之後當前執行緒呼叫await(),釋放持有的鎖,並等待被喚醒。

當有連線被release()釋放回連線池時,會從pending連結串列頭獲取future,並喚醒其執行緒繼續獲取連線,做到了先進先出。

5. RouteToPool

每個RouteToPool都會管理一個池,也就是持有pending 、leased 、available 這些物件。

同時,CPool本身也會維護一個總的available、leased、pending物件,是那些routeToPool中連結串列和集合的總和。

b. 分配連線 & 建立連線

分配連線

連線分配的過程:

  1. 如果available中有可用連線,則直接返回該連線
  2. 判斷routeToPool和全域性的連線數量是否分別達到maxPerRoute和MaxTotal的限制,如果都沒達到,則建立一個連線,然後返回
  3. 如果上面的條件都沒達成,就掛起當前執行緒,然後構造一個Future物件放入pending佇列,等待有連線釋放後喚醒自己。

建立連線

當分配到PoolEntry連線實體後,會呼叫establishRoute(),建立socket連線並與ManagedHttpClientConnection繫結。

c. 回收連線 & 保持連線

回收連線

保持連線

連線是否要保持

連線是否被標記成可重用是根據連線是否可以保持。

客戶端如果希望保持長連線,應該在發起請求時告訴伺服器希望伺服器保持長連線(http 1.0設定connection欄位為keep-alive,http 1.1欄位預設保持)。根據伺服器的響應來確定是否保持長連線,判斷原則如下:

  1. 檢查返回response報文頭的Transfer-Encoding欄位,若該欄位值存在且不為chunked,則連線不保持,直接關閉
  2. 檢查返回的response報文頭的Content-Length欄位,若該欄位值為空或者格式不正確(多個長度,值不是整數)或者小於0,則連線不保持,直接關閉
  3. 檢查返回的response報文頭的connection欄位值,如果欄位存在,且欄位值為close 則連線不保持,直接關閉,若欄位值為keep-alive則連線標記為保持。欄位不存在或者不為這兩個中的一個,則http 1.1版本預設為保持,將連線標記為保持, 1.0版本預設為連線不保持,直接關閉
連線保持時間

連線保持時,會更新PoolEntry的expiry到期時間,計算邏輯為:

  1. 如果response頭中的keep-alive欄位中timeout屬性值存在且為正值:newExpiry = System.currentTimeMillis() + timeout;。如果timeout值不存在或者為負數,則newExpiry = Long.MAX_VALUE
  2. 最後拿原來的expire和新的expire取最小值:expire=Math.min(oldExpire,newExpire)

如何釋放連線

httpclient會將我們的請求響應封裝成CloseableHttpResponse物件,我們可以通過這個物件獲取響應的內容。獲取內容時,最終是通過底層的InputStream.read()進行讀取的,httpclient中對於該InputStream的實現是org.apache.http.conn.EofSensorInputStream,它會在read過程中不斷的檢查流是否讀完,一旦檢測到讀完,就會自動根據該連線是否可重用來選擇把連線返回給連線池或者關閉連線。

同時,我們也可以人為功能CloseableHttpResponse.close()方法來回收該連線或者關閉該連線。

d. 連線的過期和失效

每次從連線池中獲取連線時,都會先檢測該連線是否已經過期或者關閉,如果是的話,就分別從routeToPool、httpConnPool的available佇列移除,然後繼續獲取下一個連線。

expire到期

每個連線都有一個expire時間,這個過期時間是連線管理器用來管理連線的,並不是說過了這個時間tcp連線就不能用了。只是連線管理器不會再使用這個連線了。

底層連線被關閉

可能連線被服務端單方面關閉。那麼,httpclient是怎麼判斷連線被關閉的呢?

httpclient會通過socket輸入流嘗試讀取資料,它將soTimeout設定為1ms,然後讀取資料。如果返回的位元組數小於0,則說明該連線關閉了。

/**BHttpConnectionBase#isStale()*/
public boolean isStale() {
    if (!isOpen()) {
        return true;
    }
    try {
        final int bytesRead = fillInputBuffer(1);
        return bytesRead < 0;
    } catch (final SocketTimeoutException ex) {
        return false;
    } catch (final IOException ex) {
        return true;
    }

socket輸入流讀取資料的底層也是通過recv系統指令完成的,執行recv指令時,在阻塞的情況下,如果返回的值是-1,則表示連線被異常關閉。如果返回的是0,則表示連線被關閉了。我們這裡講soTimeout設定為1ms,所以最多隻會阻塞1ms就返回,如果連線被服務端單方面關閉的話,這裡就會返回-1,我們馬上就知道連線被關閉了。

e. 後臺執行緒清除過期和閒置過久的連線

如果每次獲取連線時都要去判斷連線是否過期或者關閉,會造成一定的效能損耗。另外如果連線長時間沒用,長期閒置在那也是一種資源浪費。所以httpclient提供了一個機制,也就是開啟後臺執行緒定時的清除過期和閒置過久的連線。注意,4.5.2版本預設是有這個機制的,以前的版本不太確定有沒有,如果沒有我們也可以自己寫一個。PoolingHttpClientConnectionManager提供了兩個方法,closeExpiredConnectionscloseIdleConnections,分別用來清除過期的連線以及閒置過久的連線。

這個執行緒預設是不開啟的,我們可以在構建httpclient的時候設定

        return HttpClients.custom()
                .setDefaultRequestConfig(defaultRequestConfig)
                //開啟後臺執行緒清除過期的連線
                .evictExpiredConnections()
                //開啟後臺執行緒清除閒置30秒以上的連線
                .evictIdleConnections(30, TimeUnit.SECONDS)
                .setConnectionManager(cm)
                .setRetryHandler(httpRetryHandler).build();

evictExpiredConnections和evictIdleConnections中只要有一個被呼叫了,就會開啟那個後臺執行緒。

        if (!this.connManagerShared) {
            if (closeablesCopy == null) {
                closeablesCopy = new ArrayList<Closeable>(1);
            }
            final HttpClientConnectionManager cm = connManagerCopy;
            //只要有一個為true,就開啟清除執行緒
            if (evictExpiredConnections || evictIdleConnections) {
                final IdleConnectionEvictor connectionEvictor = new IdleConnectionEvictor(cm,
                        maxIdleTime > 0 ? maxIdleTime : 10, maxIdleTimeUnit != null ? maxIdleTimeUnit : TimeUnit.SECONDS);
                closeablesCopy.add(new Closeable() {

                    @Override
                    public void close() throws IOException {
                        connectionEvictor.shutdown();
                    }

                });
                //啟動執行緒
                connectionEvictor.start();
            }
            closeablesCopy.add(new Closeable() {

                @Override
                public void close() throws IOException {
                    cm.shutdown();
                }

            });
        }

執行緒開啟後,會執行下面的方法

public void run() {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        //休眠sleepTimeMs時間
                        Thread.sleep(sleepTimeMs);
                        //清除過期的連線
                        connectionManager.closeExpiredConnections();
                        if (maxIdleTimeMs > 0) {
                            //清除閒置過久的連線
                            connectionManager.closeIdleConnections(maxIdleTimeMs, TimeUnit.MILLISECONDS);
                        }
                    }
                } catch (final Exception ex) {
                    exception = ex;
                }

}
//關閉過期的連線
    @Override
    public void closeExpiredConnections() {
        this.log.debug("Closing expired connections");
        this.pool.closeExpired();
    }


    public void closeExpired() {
        final long now = System.currentTimeMillis();
        enumAvailable(new PoolEntryCallback<T, C>() {

            @Override
            public void process(final PoolEntry<T, C> entry) {
                if (entry.isExpired(now)) {
                    entry.close();
                }
            }

        });
    }
//關閉閒置太久的連線 
    @Override
    public void closeIdleConnections(final long idleTimeout, final TimeUnit tunit) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
        }
        this.pool.closeIdle(idleTimeout, tunit);
    }

    public void closeIdle(final long idletime, final TimeUnit tunit) {
        Args.notNull(tunit, "Time unit");
        long time = tunit.toMillis(idletime);
        if (time < 0) {
            time = 0;
        }
        final long deadline = System.currentTimeMillis() - time;
        enumAvailable(new PoolEntryCallback<T, C>() {

            @Override
            public void process(final PoolEntry<T, C> entry) {
                if (entry.getUpdated() <= deadline) {
                    entry.close();
                }
            }

        });
    }

另外,從上面的程式碼可以看出,這個執行緒其實只會關閉過期的連線以及閒置太久的連線,對於那些被服務端異常關閉的連線,是不會處理的。

三、總結

httpclient的介紹大概就到這裡。由於時間有限,並沒有很深入的研究其原始碼實現,但是對其架構也有了一定的認識。再遇到相關的問題時也可以較快的分析出來,實在不行也可以跟蹤原始碼查。

另外,本文如果哪裡有說的不對的地方,歡迎指出,一起交流~

本文參考連結: