OkHttp深入學習(二)——網路
阿新 • • 發佈:2019-01-09
getResponseWithInterceptorChain()@RealCall.class
[email protected]
chain.proceed方法的最底層是通過呼叫getResponse方法獲得對應的response,接著看看該部分的原始碼。 getResponse()@RealCall.class
2、將得到的networkResponse中的cookies存入OkHttpClient中的cookieJar物件中,該物件預設是空的,即不能存入任何url的cookie,如果我們在構造OkHttpClient的時候給予它一個CookieJar那麼OkHttpClient就會將每次獲得的cookie都存入我們定義的CookieJar中。一般是把url作為key,cookies作為value。 3、如果該Request在OkHttpC中的快取中存在對應的Response,則更新該快取 4、利用獲得的networkResponse給userResponse賦值 5、根據得到的Response判斷是否存入快取 因為上面得到的Response來自於方法readNetworkResponse(),那麼接下來我們來分析一下該方法是如何工作的。 readNetworkResponse()@HttpEngine.class
1、connection.allocations.size() < connection.allocationLimit;判斷該RealConnection所服務的StreamAllocation數量是否小於門限值。
2、address.equals(connection.route().address);該Connection的hostname地址等於方法引數的address值。
3、connection.noNewStreams能否被其它StreamAllocation。
4、streamAllocation.acquire(connection);等價於 connection.allocations.add(streamAllocation) ,將StreamAllocation新增到RealConnection的allocations集合中。增加RealConnection的引用計數。當該引用計數為0時考慮回收該RealConnection。
put()@ConnectionPool.class
至此我們對於okhttp的網路通訊功能的實現進行了瞭解,下面對本節進行一下總結:
private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException {
Interceptor.Chain chain = new ApplicationInterceptorChain(0, originalRequest, forWebSocket); //是我們傳進來的Request
return chain.proceed(originalRequest);
}
方法真是簡單的一逼啊。利用客戶請求構造一個ApplicationInterceptorChain物件,隨後呼叫其proceed方法。我們接著往下走ApplicationInterceptorChain內部類還算簡單,裡面有一個index和一個request物件。proceed方法使用了遞迴,將使用者的請求通過攔截器一層一層的包裝最後得到一個全新的請求,在遞迴的最底層即所有的攔截器已經執行完畢後,則呼叫getResponse方法獲取response,通過該請求得到的請求結果最後又利用攔截器一層層的包裝最終得到一個全新的網路請求結果。直接這樣說可能不太好理解,下面給出一個普通的攔截器使用例子:class ApplicationInterceptorChain implements Interceptor.Chain{ private final int index; private final Request request; private final boolean forWebSocket; ApplicationInterceptorChain(int index, Request request, boolean forWebSocket) { this.index = index; this.request = request; this.forWebSocket = forWebSocket; } public Response proceed(Request request) throws IOException { if (index < client.interceptors().size()) { Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket); Interceptor interceptor = client.interceptors().get(index); Response interceptedResponse = interceptor.intercept(chain); //該方法內部會出現chain.proceed語句,即實現遞迴呼叫 return interceptedResponse; //最終上面的所有 } return getResponse(request, forWebSocket);//當遞迴到最內層時,才會呼叫到該方法,完成實際的網路請求 } }
這個攔截器很簡單就是在請求傳送前記錄時間,並在結果返回之後記錄時間,中間最重要的就是呼叫了chain.proceed(request)方法和返回一個Response物件,使得遞迴功能得以實現,否則加了該攔截器,請求將無法繼續被執行。因此任何一個正常的迭代器都會出現Response response = chain.proceed(request); return response;這兩行語句的。 ApplicationInterceptorChain攔截器的效果圖如下。class LoggingInterceptor implements Interceptor { @Override public Response intercept(Interceptor.Chain chain) throws IOException { Request request = chain.request(); long t1 = System.nanoTime(); logger.info(String.format("Sending request %s on %s%n%s", request.url(), chain.connection(), request.headers())); Response response = chain.proceed(request); long t2 = System.nanoTime(); logger.info(String.format("Received response for %s in %.1fms%n%s", response.request().url(), (t2 - t1) / 1e6d, response.headers())); return response; } }
chain.proceed方法的最底層是通過呼叫getResponse方法獲得對應的response,接著看看該部分的原始碼。 getResponse()@RealCall.class
Response getResponse(Request request, boolean forWebSocket) throws IOException {
//step1 根據request的body設定request的http報頭
RequestBody body = request.body();
if (body != null) {
Request.Builder requestBuilder = request.newBuilder();
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
request = requestBuilder.build();
}
//step2 給每個request建立一個HttpEngine,由該引擎負責網路請求
engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null);
int followUpCount = 0;
while (true) {
if (canceled) {
engine.releaseStreamAllocation();
throw new IOException("Canceled");
}
boolean releaseConnection = true;
try {
engine.sendRequest();
engine.readResponse();
releaseConnection = false;
} catch (RequestException e) {
// The attempt to interpret the request failed. Give up.
throw e.getCause();
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
HttpEngine retryEngine = engine.recover(e.getLastConnectException(), null);
if (retryEngine != null) {
releaseConnection = false;
engine = retryEngine;
continue;
}
// Give up; recovery is not possible.
throw e.getLastConnectException();
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
HttpEngine retryEngine = engine.recover(e, null);
if (retryEngine != null) {
releaseConnection = false;
engine = retryEngine;
continue;
}
// Give up; recovery is not possible.
throw e;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
StreamAllocation streamAllocation = engine.close();
streamAllocation.release();
}
}
//正常情況下,前面已經正常執行了engine.sendRequest(); engine.readResponse();
Response response = engine.getResponse();
if (followUp == null) {
if (!forWebSocket) {
engine.releaseStreamAllocation();//成功獲取到連結時將StreamAllocation和Connection解綁
}
return response;
}
//step3 正常情況下上面已經返回response,下面是一些異常情況的處理
........
} //end of while(true)
}
該方法首先設定request的head,隨後由該request構建一個HttpEngine物件,並依次呼叫engine.sendRequest()、engine.readResponse()和engine.getResponse()方法。最後將得到的response結果,進行返回。下面我們就依次來分析一下HttpEngine的sendRequest()、readResponse()、getResponse()方法
HttpEngine.class
首先看一下該類的構造器 HttpEngine()@HttpEngine.classpublic HttpEngine(OkHttpClient client, Request request, boolean bufferRequestBody,
boolean callerWritesRequestBody, boolean forWebSocket, StreamAllocation streamAllocation,
RetryableSink requestBodyOut, Response priorResponse) {
this.client = client;
this.userRequest = request;
this.bufferRequestBody = bufferRequestBody;
this.callerWritesRequestBody = callerWritesRequestBody;
this.forWebSocket = forWebSocket;
this.streamAllocation = streamAllocation != null
? streamAllocation
: new StreamAllocation(client.connectionPool(), createAddress(client, request));
this.requestBodyOut = requestBodyOut;
this.priorResponse = priorResponse;
}
這裡我們主要關注的是傳入的OkHttpClient client, Request request兩個引數,這兩個引數在HttpEngine中的命名分別為client和userRequest。此外還注意到構造器中對this.streamAllocation = streamAllocation
!= null ? streamAllocation : new StreamAllocation(client.connectionPool(), createAddress(client, request));的初始化。預設都是通過呼叫new StreamAllocation(client.connectionPool(),
createAddress(client, request))獲得StreamAllocation物件的。對StreamAllocation它主要負責對RealConnection的管理,我們先不看其原始碼,後面再講。先看sendRequest()方法。
sendRequest()@HttpEngine.classpublic void sendRequest() throws RequestException, RouteException, IOException {
....
Request request = networkRequest(userRequest); //note1
InternalCache responseCache = Internal.instance.internalCache(client); //note2
Response cacheCandidate = responseCache != null? responseCache.get(request) : null;
long now = System.currentTimeMillis();
cacheStrategy = new CacheStrategy.Factory(now, request, cacheCandidate).get(); //note3
networkRequest = cacheStrategy.networkRequest;
cacheResponse = cacheStrategy.cacheResponse;
if (responseCache != null) { //OkHttpClient具備快取功能
responseCache.trackResponse(cacheStrategy);
}
if (cacheCandidate != null && cacheResponse == null) { //在OkHttpClient快取中得到response當時經過快取策略分析該response無效
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// 網路請求為空,且本地命中的快取也為空,則返回錯誤的504http報文
if (networkRequest == null && cacheResponse == null) {
userResponse = new Response.Builder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_BODY)
.build();
return;
}
//網路請求為空,本地命中的快取也不為空,返回本地命中的response
if (networkRequest == null) {
userResponse = cacheResponse.newBuilder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.cacheResponse(stripBody(cacheResponse))
.build();
userResponse = unzip(userResponse);
return;
}
//note4
boolean success = false;
try {
httpStream = connect();
httpStream.setHttpEngine(this);
if (writeRequestHeadersEagerly()) {
long contentLength = OkHeaders.contentLength(request);
if (bufferRequestBody) {
if (contentLength > Integer.MAX_VALUE) {
throw new IllegalStateException("Use setFixedLengthStreamingMode() or "
+ "setChunkedStreamingMode() for requests larger than 2 GiB.");
}
if (contentLength != -1) {
// Buffer a request body of a known length.
httpStream.writeRequestHeaders(networkRequest);
requestBodyOut = new RetryableSink((int) contentLength);
} else {
// Buffer a request body of an unknown length. Don't write request headers until the
// entire body is ready; otherwise we can't set the Content-Length header correctly.
requestBodyOut = new RetryableSink();
}
} else {
httpStream.writeRequestHeaders(networkRequest);
requestBodyOut = httpStream.createRequestBody(networkRequest, contentLength);
}
}
success = true;
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (!success && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
}
1、該方法很長,我們慢慢消化。networkRequest(userRequest);方法功能上是對request中的http報頭進行相應修改,如cookies,使其符合Http資料報的協議規範。
2、對於InternalCache responseCache = Internal.instance.internalCache(client),一臉的懵逼,之前從來沒有見過,不過不要方,它在我們構建OkHttpClient的時候被初始化的!讓我們回頭擼一把OkHttpClient,果然發現有下面的一段內容, Internal.instance = new Internal() { .....}。這下我們就知道Internal.instance物件在構造OkHttpClient的時候就建立好了。實際上Internal.instance.internalCache(client)等價於
client.internalCache();即得到一個快取操作符。隨後呼叫快取操作符的get方法得到對應的快取結果。
3、cacheStrategy = new CacheStrategy.Factory(now, request, cacheCandidate).get();利用一定的快取策略對request和cacheCandidate進行處理,並對HttpEngine中的networkRequest和cacheResponse物件進行賦值。如果得到的networkRequest==null,表明快取策略認為該請求不需要訪問網路,如果cacheResponse==null表明該請求在快取中沒有找到想要的結果。可能有童鞋不太懂為何要這樣做,其實舉個栗子就懂了,就是比如我們本地有快取內容,但是可能該內容存放時間太長已經過期了,系統不能再使用該快取,需要重新去網上下載。如果對該部分比較感興趣參考下一篇部落格《OkHttp深入學習(三)——Cache》,本節不對其進行詳細說明。
4、經過上面的分析如果沒有從快取中得到預期的結果,那麼需要通過網路得到預期的response。首先執行的是httpStream = connect(); 該方法內部呼叫streamAllocation.newStream()方法;騷年你還記得嗎?streamAllocation是在構造HttpEngine時建立的物件。呼叫的構造器為new
StreamAllocation(client.connectionPool(), createAddress(client, request)); 對於OkHttpClient的連結池connectionPool部分參考後面的OkHttpClient.class部分的分析。構造完StreamAllocation之後呼叫其newStream方法。得到一個HttpStream物件,該物件是一個已經跟伺服器端進行了三次握手的連結,通過該物件就能向伺服器傳送接收Http報文資料。對於connect()後面的內容,一般情況我們不太會遇到,下面直接開始分析readResponse()方法
readResponse()@HttpEngine.class
public void readResponse() throws IOException {
if (userResponse != null) {
return; // 已經通過快取獲取到了響應
}
if (networkRequest == null && cacheResponse == null) {
throw new IllegalStateException("call sendRequest() first!");
}
if (networkRequest == null) {
return; // No network response to read.
}
//note1
Response networkResponse;
if (forWebSocket) {
httpStream.writeRequestHeaders(networkRequest);
networkResponse = readNetworkResponse();
} else if (!callerWritesRequestBody) {
networkResponse = new NetworkInterceptorChain(0, networkRequest).proceed(networkRequest);
} else {
// Emit the request body's buffer so that everything is in requestBodyOut.
if (bufferedRequestBody != null && bufferedRequestBody.buffer().size() > 0) {
bufferedRequestBody.emit();
}
// Emit the request headers if we haven't yet. We might have just learned the Content-Length.
if (sentRequestMillis == -1) {
if (OkHeaders.contentLength(networkRequest) == -1
&& requestBodyOut instanceof RetryableSink) {
long contentLength = ((RetryableSink) requestBodyOut).contentLength();
networkRequest = networkRequest.newBuilder()
.header("Content-Length", Long.toString(contentLength))
.build();
}
httpStream.writeRequestHeaders(networkRequest);
}
// Write the request body to the socket.
if (requestBodyOut != null) {
if (bufferedRequestBody != null) {
// This also closes the wrapped requestBodyOut.
bufferedRequestBody.close();
} else {
requestBodyOut.close();
}
if (requestBodyOut instanceof RetryableSink) {
httpStream.writeRequestBody((RetryableSink) requestBodyOut);
}
}
networkResponse = readNetworkResponse();
}
//note 2
receiveHeaders(networkResponse.headers());
//note 3
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
if (validate(cacheResponse, networkResponse)) {
userResponse = cacheResponse.newBuilder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
releaseStreamAllocation();//呼叫streamAllocation.release();方法,作用就是將StreamAllocation和內部的connection脫離關係
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
InternalCache responseCache = Internal.instance.internalCache(client);
responseCache.trackConditionalCacheHit();
responseCache.update(cacheResponse, stripBody(userResponse));
userResponse = unzip(userResponse);
return;
} else {
closeQuietly(cacheResponse.body());
}
}
//note 4
userResponse = networkResponse.newBuilder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
//note 5
if (hasBody(userResponse)) {
maybeCache();
userResponse = unzip(cacheWritingResponse(storeRequest, userResponse));
}
}
1、對於一般情況我們會執行networkResponse = new NetworkInterceptorChain(0, networkRequest).proceed(networkRequest)方法,與ApplicationInterceptorChain的功能類似,這裡是在向網路伺服器傳送資料前巢狀幾層攔截器,採用遞迴方法實現。遞迴方法的最底層會呼叫Response
response = readNetworkResponse();方法得到網路返回資料,之後又通過攔截器進行相應處理。不過無論通過哪條路徑最終都是通過呼叫readNetworkResponse()方法獲得Response的。這裡給出NetWorkInterceptorChain的工作原理圖
2、將得到的networkResponse中的cookies存入OkHttpClient中的cookieJar物件中,該物件預設是空的,即不能存入任何url的cookie,如果我們在構造OkHttpClient的時候給予它一個CookieJar那麼OkHttpClient就會將每次獲得的cookie都存入我們定義的CookieJar中。一般是把url作為key,cookies作為value。 3、如果該Request在OkHttpC中的快取中存在對應的Response,則更新該快取 4、利用獲得的networkResponse給userResponse賦值 5、根據得到的Response判斷是否存入快取 因為上面得到的Response來自於方法readNetworkResponse(),那麼接下來我們來分析一下該方法是如何工作的。 readNetworkResponse()@HttpEngine.class
private Response readNetworkResponse() throws IOException {
httpStream.finishRequest();
Response networkResponse = httpStream.readResponseHeaders()
.request(networkRequest)
.handshake(streamAllocation.connection().handshake())
.header(OkHeaders.SENT_MILLIS, Long.toString(sentRequestMillis))
.header(OkHeaders.RECEIVED_MILLIS, Long.toString(System.currentTimeMillis()))
.build();
if (!forWebSocket) {
networkResponse = networkResponse.newBuilder()
.body(httpStream.openResponseBody(networkResponse))
.build();
}
if ("close".equalsIgnoreCase(networkResponse.request().header("Connection"))
|| "close".equalsIgnoreCase(networkResponse.header("Connection"))) {
streamAllocation.noNewStreams();
}
return networkResponse;
}
該方法邏輯很簡單就是呼叫httpStream.readResponseHeaders()方法和httpStream.openResponseBody方法獲取到Response。那麼我們繼續看HttpStream的readResponseHeaders方法和httpStream.openResponseBody方法。走到這裡突然你可能忘了HttpStream從哪裡得到的。這裡我們複習一下。在構造HttpEngine時建立的物件的時候我們建立了一個streamAllocation物件。呼叫的構造器為new
StreamAllocation(client.connectionPool(), createAddress(client, request)); 對於OkHttpClient的連結池connectionPool部分參考後面的OkHttpClient.class部分的分析。構造完StreamAllocation之後呼叫其newStream方法,得到一個HttpStream物件。我們先看一下getResponse方法,隨後即可進入StreamAllocation和其newStream方法學習。
getResponse()@HttpEngine.classpublic Response getResponse() {
if (userResponse == null) throw new IllegalStateException();
return userResponse;
}
該方法很簡單就是返回HttpEngine中的userResponse域。
StreamAllocation.class
[email protected]public StreamAllocation(ConnectionPool connectionPool, Address address) {
this.connectionPool = connectionPool;
this.address = address;
this.routeSelector = new RouteSelector(address, routeDatabase());
}
這裡額外的建立了一個RouteSelector物件,該物件由Address和路由資料庫構造。
routeDatabase()@StreamAllocation.class
private RouteDatabase routeDatabase() {
return Internal.instance.routeDatabase(connectionPool);
}
該方法其實返回的就是connectionPool.routeDatabase; 該routeDatabase實際上用於儲存之前訪問失敗的路由代理。上面看完了StreamAllocation的構造器,趕緊來看看其newStream方法。
newStream()@StreamAllocation.class
public HttpStream newStream(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws RouteException, IOException {
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, //note1
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
HttpStream resultStream; //note2
if (resultConnection.framedConnection != null) {
resultStream = new Http2xStream(this, resultConnection.framedConnection); //在這裡支援HTTP/2 and SPDY黑科技
} else {
resultConnection.socket().setSoTimeout(readTimeout);
resultConnection.source.timeout().timeout(readTimeout, MILLISECONDS);
resultConnection.sink.timeout().timeout(writeTimeout, MILLISECONDS);
resultStream = new Http1xStream(this, resultConnection.source, resultConnection.sink); //支援1.1協議;
}
synchronized (connectionPool) {
stream = resultStream;
return resultStream;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
1、首先利用findHealthyConnection()方法得到一個RealConnection物件。得到了RealConnection物件意味著我們已經跟服務端有了一條通訊鏈路了,而且通過RealConnection的source 和 sink分別實現向管道中讀寫資料。
2、根據請求的協議不同建立不同的HttpStream,一般情況下我們建立的都是Http1xStream物件,即Http1.X協議。該物件功能就是對request中的資料按照對應的http協議中的格式暴力的通過sink寫入到管道流中,對response中的資料通過source讀取並進行解析。
該部分的重點轉移到了方法findHealthyConnection方法,往下看。
findHealthyConnection()@StreamAllocation.class
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Otherwise do a potentially-slow check to confirm that the pooled connection is still good.
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate;
}
connectionFailed(new IOException());
}
該方法的大體流程就是先通過findConnection獲取到一個可用的連結,隨後對該連結進行一定的健康性檢查,如果不通過則繼續呼叫findConnection尋找直到找到為止。繼續看findConnection方法
findConnection()@StreamAllocation.class
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException, RouteException {
Route selectedRoute;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (stream != null) throw new IllegalStateException("stream != null");
if (canceled) throw new IOException("Canceled");
//note 1
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
//note 2
RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);
if (pooledConnection != null) {
this.connection = pooledConnection;
return pooledConnection;
}
selectedRoute = route;
//如果是第一次走到這裡 則route為null
}
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
//note 3
synchronized (connectionPool) {
route = selectedRoute;
}
}
RealConnection newConnection = new RealConnection(selectedRoute);
//note 4
acquire(newConnection);
//note 5
synchronized (connectionPool) {
Internal.instance.put(connectionPool, newConnection);
//note 6
this.connection = newConnection;
if (canceled) throw new IOException("Canceled");
}
newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(), connectionRetryEnabled);
//note7
routeDatabase().connected(newConnection.route());
//note 8
return newConnection;
}
1、如果當前StreamAllocation物件中的RealConnection不為空,且沒有新的Stream則直接返回該RealConnection。
2、從執行緒池中獲取一個RealConnection物件,等價於直接呼叫connectionPool.get(address, streamAllocation),如果獲取到了RealConnection則直接返回該RealConnection。step1和step2得到的RealConnection都是已經跟伺服器完成了三次握手鍊接的連線。
3、利用routeSelector.next()方法得到一個route物件;routeSelector在構造StreamAllocation時建立的跟我們的request是繫結的;每個連結都需要選擇一個代理,IP地址,TLS。
4、通過得到的route物件構建一個RealConnection物件,該route包含了客戶請求連結的ip地址和埠號等資訊。
5、connection.allocations.add(new WeakReference<>(this)); 為當前StreamAllocation建立一個弱引用,新增到RealConnection的allocation中,即增加其引用計數。
6、將上面建立的RealConnection新增到OkHttpClient的ConnectionPool連結池中。
7、呼叫RealConnection的connect方法,建立連結,完成跟目標地址的的三次握手。
8、將newConnection對應的route從routeDatabase中移除出去,routeDatabase實際上維護一個集合,裡面存入的route都是failed
最後給大家科普StreamAllocation最後一個方法,該方法用於回收StreamAllocation對應的Connection。呼叫StreamAllocation的noNewStreams和release方法都會呼叫到該方法。一般在任務執行結束後都會通過HttpEngine的releaseStreamAllocation()方法間接呼叫StreamAllocation的deallocate方法將RealConnection和StreamAllocation進行解綁。
deallocate@StreamAllocation.class
private void deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
RealConnection connectionToClose = null;
synchronized (connectionPool) {
if (streamFinished) {
this.stream = null;
}
if (released) {
this.released = true;
}
if (connection != null) {
if (noNewStreams) {
connection.noNewStreams = true; //noNewStreams表明該連結不能提供給新Stream使用
}
if (this.stream == null && (this.released || connection.noNewStreams)) {
release(connection); //note1
if (connection.allocations.isEmpty()) { //note2
connection.idleAtNanos = System.nanoTime();
if (Internal.instance.connectionBecameIdle(connectionPool, connection)) { //note3
connectionToClose = connection;
}
}
connection = null;
}
}
}
if (connectionToClose != null) { //note4
Util.closeQuietly(connectionToClose.socket());
}
}
1、完成的工作就是把當前的StreamAllocation從Realconnection的allocations佇列中移除出去。如果沒有在RealConnection的Allocation集合中找到對StreamAllocation的引用則丟擲異常。
2、Connection的Allocation佇列如果為空,表明該RealConnection沒有被任何StreamAllocation使用。
3、Internal.instance.connectionBecameIdle(connectionPool, connection)等價於pool.connectionBecameIdle(connection); 該方法內部判斷connection.noNewStreams為真,即該Connection不能給新的stream提供服務,則將該Connection從Connectionpool中移除出去,同時返回true。否則讓Connectionpool的清理執行緒去處理,返回false。
4、第三步中返回結果為真,即RealConnection已經從ConnectionPool中移除,則在此處強制呼叫socket的close方法,關閉套接字,回收網路資源。
繼續往下走,那就是RealConnection物件的建立和其connected方法的實現。RealConnection這也是我們所能走到的最底層的類了。勝利的曙光要來了~
RealConnection.class
private Socket rawSocket; //最底層socket public Socket socket; //應用層socket public final List<Reference<StreamAllocation>> allocations = new ArrayList<>(); [email protected]public RealConnection(Route route) {
this.route = route;
}
[email protected]
public void connect(int connectTimeout, int readTimeout, int writeTimeout,List<ConnectionSpec> connectionSpecs, boolean connectionRetryEnabled) throws RouteException {
if (protocol != null) throw new IllegalStateException("already connected");
RouteException routeException = null;
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
Proxy proxy = route.proxy();
Address address = route.address();
if (route.address().sslSocketFactory() == null
&& !connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication not supported: " + connectionSpecs));
}
while (protocol == null) {
try {
//note1
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket() //對於ssl的情況
: new Socket(proxy);
//note 2 上面呼叫的方法已經很底層了!!使用的是jdk中的內容了
connectSocket(connectTimeout, readTimeout, writeTimeout, connectionSpecSelector);
} catch (IOException e) {
......
}
}
}
1、利用proxy的type值判斷構造一個怎樣的Socket,是普通Socket還是SSL性質的Socket。
2、呼叫 connectSocket方法。
connectSocket()@RealConnection.class
private void connectSocket(int connectTimeout, int readTimeout, int writeTimeout, ConnectionSpecSelector connectionSpecSelector) throws IOException {
rawSocket.setSoTimeout(readTimeout);
try {
//note 1
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
throw new ConnectException("Failed to connect to " + route.socketAddress());
}
//note 2
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
if (route.address().sslSocketFactory() != null) {
connectTls(readTimeout, writeTimeout, connectionSpecSelector);
} else {
protocol = Protocol.HTTP_1_1;
socket = rawSocket;
}
if (protocol == Protocol.SPDY_3 || protocol == Protocol.HTTP_2) {
socket.setSoTimeout(0); // Framed connection timeouts are set per-stream.
FramedConnection framedConnection = new FramedConnection.Builder(true)
.socket(socket, route.address().url().host(), source, sink)
.protocol(protocol)
.listener(this)
.build();
framedConnection.sendConnectionPreface();
// Only assign the framed connection once the preface has been sent successfully.
this.allocationLimit = framedConnection.maxConcurrentStreams();
this.framedConnection = framedConnection;
} else {
this.allocationLimit = 1;
}
}
這部分是最激動人心的地方,這裡我們就要開始跟外界進行通訊了~,也是我們這次對okhttp專案學習的最底層了。
1、Platform.get()等價於Platform.findPlatform();結果是根據平臺的不同得到不同的Platform物件,使用Class.forName方法獲取對應的類物件;下面假設平臺是Android平臺。Platform. get(). connectSocket() 方法等價於呼叫socket.connect(address, connectTimeout); 完成了TCP三次握手
2、source = Okio.buffer(Okio.source(rawSocket));就是從rawSocket獲得一個InputStream再用buffer包裝一下。sink = Okio.buffer(Okio.sink(rawSocket));就是從rawSocket獲得一個OutputStream再用buffer包裝一下。到此為止我們已經獲得了跟伺服器通訊的鏈路。
在本節結束之前我們最後對ConnectionPool和RouteSelector進行介紹
ConnectionPool.class
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
//該類跟Dispatcher.class一樣建立了一個執行緒池,用於維護管理RealConnection
final RouteDatabase routeDatabase = new RouteDatabase();
//該物件建立一個RouteDataBase物件,是一個路由資料庫,該資料庫其實很簡單,裡面維護了一個private final Set<Route> failedRoutes = new LinkedHashSet<>()集合。存放失敗的路由資料
private final Deque<RealConnection> connections = new ArrayDeque<>();
//該集合是ConnectionPool用於儲存當前系統經歷過三次握手可用的RealConnection
ConnectionPool()@ConnectionPool.class
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections; //最大空閒連線數
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
建立的連線池預設維護5條自由連結,且自由連結空閒時間最多為5分鐘。RealConnection是否是自由態由RealConnection中儲存的Allocations集合的大小決定。如果RealConnection.noNewStream==true則表明該RealConnection拒絕為新StreamAllocation服務,往往意味著等待被回收。
get()@ConnectionPool.classRealConnection get(Address address, StreamAllocation streamAllocation) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.allocations.size() < connection.allocationLimit //note1
&& address.equals(connection.route().address) //note2
&& !connection.noNewStreams) { //note3
streamAllocation.acquire(connection); //note4
return connection;
}
}
return null;
}
該方法的作用就是嘗試從集合connections中獲取到一個可重複利用的RealConnection。1、connection.allocations.size() < connection.allocationLimit;判斷該RealConnection所服務的StreamAllocation數量是否小於門限值。
2、address.equals(connection.route().address);該Connection的hostname地址等於方法引數的address值。
3、connection.noNewStreams能否被其它StreamAllocation。
4、streamAllocation.acquire(connection);等價於 connection.allocations.add(streamAllocation) ,將StreamAllocation新增到RealConnection的allocations集合中。增加RealConnection的引用計數。當該引用計數為0時考慮回收該RealConnection。
put()@ConnectionPool.class
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);//note1
}
connections.add(connection);//note2
}
1、對connections集合進行維護,cleanupRunnable的run方法內部會執行cleanup方法,下面我們將對其進行介紹
2、將RealConnection加入到ConnectionPool的connects集合
cleanup()@ConnectionPool.classlong cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// note1
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos; //note 2
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
} // end of for
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) { //note3
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
//返回下次正常情況下次需要檢查的等待時間
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
//返回下次正常情況下次需要檢查的等待時間
return keepAliveDurationNs;
} else {
//池中沒有自由連結,也沒有正在使用的連結
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket()); //note4
// Cleanup again immediately.
return 0;
}
Connectionpool維護其連結池中連結;該方法是在cleanupRunnable中的run方法中被呼叫。
1、縮減和獲得與該Connection繫結的StreamAllocation數量,如果數量不為0,證明該RealConnection正在被某個StreamAllocation使用,否則進行下面的步驟。
2、獲取該Connection的自由時間,如果該連結自由時間超過當前系統所記錄的Connection最長自由時間,則重新整理當前記錄最大值。這是標記過程
3、執行到這裡,已經得到了當前系統空閒執行緒等待的最長時間,如果該時間大於系統設定的最大自由時間或自由連結數大於系統所能維護的最大自由連結數,則將該RealConnection從連結池中移除出去。
4、執行到這裡,表明剛剛有一個連結從連線池中被移出,此處將關閉該RealConnection對應的socket,即執行socket.close().
[email protected]onnectionPool.class
boolean connectionBecameIdle(RealConnection connection) {
assert (Thread.holdsLock(this));
if (connection.noNewStreams || maxIdleConnections == 0) {
connections.remove(connection);
return true;
} else {
notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
return false;
}
}
該方法在StreamAllocation的deallocate中被呼叫。用於將connection回收,或者將RealConnection變成自由態。
我們對ConnectionPool做個小結:
Connection pool 建立了一個執行緒池,用於維護池中的自由連結數,RealConnection採用引用計數的方法判斷一個Connection是否是自由態,如果RealConnection的Allocations集合為空則判斷為自由態。最後採用標記清除的演算法實現對廢棄RealConnection的垃圾回收。當自由態連結數大於門限或者連結空閒時間超過門限值時則對該RealConnection資源進行回收,具體工作就是將RealConnection從ConnectionPool的connections集合中移出,底層呼叫socket.close()關閉網路連線。
RouteSelector.class
在StreamAllocation的findConnection方法中在構造RealConnection之前是需要獲得一個route物件的,而route物件的獲取是通過呼叫routeSelector的next方法來獲取的。該route物件包含了我的url請求對應的ip地址和對應埠號。 /* State for negotiating the next proxy to use. */
private List<Proxy> proxies = Collections.emptyList();
private int nextProxyIndex;
/* State for negotiating the next socket address to use. */
private List<InetSocketAddress> inetSocketAddresses = Collections.emptyList();
private int nextInetSocketAddressIndex;
private final List<Route> postponedRoutes = new ArrayList<>();
RouteSelector()@ RouteSelector.class
public RouteSelector(Address address, RouteDatabase routeDatabase) {
this.address = address;
this.routeDatabase = routeDatabase;
resetNextProxy(address.url(), address.proxy());
}
該構造其中的Address引數是在HttpEngine構建StreamAllocation時建立的,建立方法是通過呼叫HttpEngine的createAddress(client, request)方法來獲得的,方法引數分別為OkHttpClient和Request。在RouteSelector的構造器中會呼叫resetNextProxy方法,引數為客戶請求的url和對應的代理,一般情況剛開始時代理是空。
resetNextProxy()@ RouteSelector.classprivate void resetNextProxy(HttpUrl url, Proxy proxy) {
if (proxy != null) {
//note1
proxies = Collections.singletonList(proxy);
} else {
proxies = new ArrayList<>();
List<Proxy> selectedProxies = address.proxySelector().select(url.uri()); //note2
if (selectedProxies != null) proxies.addAll(selectedProxies);
// Finally try a direct connection. We only try it once!
proxies.removeAll(Collections.singleton(Proxy.NO_PROXY));
proxies.add(Proxy.NO_PROXY);
}
nextProxyIndex = 0;
}
1、如果代理不為空,則直接對proxies賦值
2、address.proxySelector()等價於client.proxySelector(),後者預設等價於ProxySelector.getDefault()等價於new java,net.ProxySelectorImpl();經歷這麼一堆等價於其實簡單講就是呼叫address.proxySelector()等於建立一個java,net.ProxySelectorImpl()物件。隨後呼叫該物件的select方法獲取與該url對應的Proxy
下面我們就來看看next方法具體完成的操作。
next()@ RouteSelector.classpublic Route next() throws IOException {
if (!hasNextInetSocketAddress()) {
if (!hasNextProxy()) {
if (!hasNextPostponed()) {
//前面三個佇列都為空
throw new NoSuchElementException();
}//end if 3
return nextPostponed();//返回最後一個之前失敗的route
}//end if 2
lastProxy = nextProxy();//note1
}//end if 1
lastInetSocketAddress = nextInetSocketAddress(); //note 2
Route route = new Route(address, lastProxy, lastInetSocketAddress); //note 3
if (routeDatabase.shouldPostpone(route)) { //note 4
postponedRoutes.add(route); //note 5
return next(); //note 6
}
return route; //返回可以用的route
}
1、首先執行到這裡,獲取下一個代理,同時重新整理集合inetSocketAddresses的資料
2、獲取到InetSocketAddress
3、利用前面得到的代理和SocketAddress構造一個Route
4、查詢route是否存在於routeDatabase中,即檢驗生成的route是不是可用
5、step4返回真,該route加入到postponedRoutes集合中,如果最後所有的代理都試過了還是不行,則還會將該route重新再嘗試一次
6、遞迴呼叫
nextProxy()@ RouteSelector.classprivate Proxy nextProxy() throws IOException {
if (!hasNextProxy()) {
throw new SocketException("No route to " + address.url().host()
+ "; exhausted proxy configurations: " + proxies);
}
Proxy result = proxies.get(nextProxyIndex++);//note1
resetNextInetSocketAddress(result);
return result;
}
從Proxies物件中獲取一個Proxy物件。proxies的值是在resetNextProxy()方法中獲得的
resetNextInetSocketAddress()@ RouteSelector.classprivate void resetNextInetSocketAddress(Proxy proxy) throws IOException {
// Clear the addresses. Necessary if getAllByName() below throws!
inetSocketAddresses = new ArrayList<>();
String socketHost;
int socketPort;
if (proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.SOCKS) {
socketHost = address.url().host();
socketPort = address.url().port();
} else {
SocketAddress proxyAddress = proxy.address();
if (!(proxyAddress instanceof InetSocketAddress)) {
throw new IllegalArgumentException(
"Proxy.address() is not an " + "InetSocketAddress: " + proxyAddress.getClass());
}
InetSocketAddress proxySocketAddress = (InetSocketAddress) proxyAddress;
socketHost = getHostString(proxySocketAddress);
socketPort = proxySocketAddress.getPort();
}
//獲取得到Address對應的IP和Port
if (socketPort < 1 || socketPort > 65535) {
throw new SocketException("No route to " + socketHost + ":" + socketPort
+ "; port is out of range");
}
if (proxy.type() == Proxy.Type.SOCKS) {
inetSocketAddresses.add(InetSocketAddress.createUnresolved(socketHost, socketPort));
} else {
// Try each address for best behavior in mixed IPv4/IPv6 environments.
List<InetAddress> addresses = address.dns().lookup(socketHost); //DNS查詢的結果 url對應多個ip地址和埠號
for (int i = 0, size = addresses.size(); i < size; i++) {
InetAddress inetAddress = addresses.get(i);
inetSocketAddresses.add(new InetSocketAddress(inetAddress, socketPort));
}
}
nextInetSocketAddressIndex = 0;
}
根據引數Proxy的值,得到一個java.net.InetSocketAddress型別物件,該物件包含了請求url對應的ip和埠資訊。
至此我們對於okhttp的網路通訊功能的實現進行了瞭解,下面對本節進行一下總結: