Okhttp檔案上傳原始碼分析
一直搞開發,也使用各種框架,但是基本上也從來沒研究過這些框架的底層是如何實現的。像我們客戶端,一般重要的事情就是:網路請求、圖片處理、操作資料庫、展現View介面,剛好今天哥們有問我一個okhttp網路請求的問題,幫他看完了之後,就詳細的走了一遍okhttp框架實現檔案上傳的基本流程,也算是對網路請求這塊的東西學習一下。
先來看一下我按照思路整理出來的詳細的流程圖。說起流程圖,大家還不能單獨看,必須要結合原始碼一起,流程圖時刻可以幫助我們找到當前的點,但是沒有原始碼,光看流程圖,你根本不知道這是幹什麼的,所以大家參考的時候,還是需要對照原始碼一起分析,哦,還需要說一下,我當前使用的是okhttp-3.2.0.jar的包,可能不同的版本,細節實現會有一些差別。
流程圖當中幾個重要的節點我都要紅色標註出來了。我要在客戶端測試的程式碼非常簡單,因為程式碼片現在越來越多了,這裡就不貼程式碼了,直接上一張圖,因為程式碼很少,所以大家看著也方便。
按照上圖的流程,我們把整個檔案上傳的過程三為三個小節來分析:1、RealCall.getResponse之前的邏輯;2、RealCall類的getResponse方法中呼叫HttpEngine.sendRequest傳送請求;3、RealCall類的getResponse方法中呼叫HttpEngine.readResponse讀取響應。
1、RealCall.getResponse之前的邏輯
這一步中的邏輯都比較簡單,就是一些資料封裝類的操作,首先獲取到我們要上傳的檔案,以我們的目標檔案為引數建立一個RequestBody物件,我們來看一下RequestBody類的create(MediaType.parse(TYPE), file)方法的實現:
這個方法的實現非常簡單,就是直接構造了一個RequestBody物件返回了,需要給大家說的就是writeTo方法,從這個方法邏輯中也可以看出,我們上傳檔案時候,就是在這裡把檔案轉換成Source物件,然後通過BufferedSink寫入流,最後傳送出去的。接著後面的兩句建立requestBody、requestPostFile我們就不展開了,就是根據我們的資料建立一個請求物件。我們繼續來看最後一句程式碼,client.newCall(requestPostFile)這一句就是建立一個RealCall物件,傳入的引數requestPostFile當中儲存了我們要上傳的檔案資料索引,我們繼續來看RealCall類的enqueue方法:public static RequestBody create(final MediaType contentType, final File file) { if(file == null) { throw new NullPointerException("content == null"); } else { return new RequestBody() { public MediaType contentType() { return contentType; } public long contentLength() { return file.length(); } public void writeTo(BufferedSink sink) throws IOException { Source source = null; try { source = Okio.source(file); sink.writeAll(source); } finally { Util.closeQuietly(source); } } }; } }
public void enqueue(Callback responseCallback) {
this.enqueue(responseCallback, false);
}
void enqueue(Callback responseCallback, boolean forWebSocket) {
synchronized(this) {
if(this.executed) {
throw new IllegalStateException("Already Executed");
}
this.executed = true;
}
this.client.dispatcher().enqueue(new RealCall.AsyncCall(responseCallback, forWebSocket));
}
這裡的實現也比較簡單,就是判斷當前的Call物件是否已執行,一個Call物件不能重複執行,這點和我們的執行緒物件一樣。然後以傳進來的Callback物件構造一個RealCall.AsyncCall,這裡的forWebSocket是上面設定的,為false,同時獲取當前client的分發器,並將構造好的RealCall.AsyncCall交給分發器執行。Dispatcher分發器當中使用的是一個執行緒池來處理我們的請求的,在它的enqueue方法中就直接執行我們的請求,RealCall.AsyncCall是繼承NamedRunnable,而NamedRunnable又實現了Runnable,所以當任務被處理時,就會回撥RealCall.AsyncCall類的execute方法,我們來看一下它的實現:
protected void execute() {
boolean signalledCallback = false;
try {
Response e = RealCall.this.getResponseWithInterceptorChain(this.forWebSocket);
if(RealCall.this.canceled) {
signalledCallback = true;
this.responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
this.responseCallback.onResponse(RealCall.this, e);
}
} catch (IOException var6) {
if(signalledCallback) {
Internal.logger.log(Level.INFO, "Callback failure for " + RealCall.this.toLoggableString(), var6);
} else {
this.responseCallback.onFailure(RealCall.this, var6);
}
} finally {
RealCall.this.client.dispatcher().finished(this);
}
}
看一下這個方法的程式碼,再對比一下我們的流程圖,就可以看到,最終回撥到我們應用層就是在這裡的。而返回給我們的Response物件就是通過getResponseWithInterceptorChain獲取到的。當我們的請求因異常而失敗時,okhttp框架中都會將其取消,所以最終的結果就根據RealCall.this.canceled標誌位來判斷,如果取消了,那麼肯定是請求失敗了,否則請求就是成功的,最後請求完成後,將當前的call從分發器中移除。好,對照著流程圖,接下來我們看一下RealCall類的getResponseWithInterceptorChain方法的實現,它當中就是直接呼叫ApplicationInterceptorChain內部類物件,然後直接呼叫它的proceed方法。構造它時傳進來的第一個引數index為0,而我們當前的實現中,client中的攔截器數量也是0,所以就直接呼叫RealCall.this.getResponse進行處理。這個方法也是比較核心的方法了,從我們的流程圖就可以看出來,在它當中呼叫了我們下面要講的兩具節點方法:sendRequest()、readResponse(),傳送請求,讀取響應,讀取響應完成後,會將結果儲存在HttpEngine類的成員變數userResponse當中,最後將它通過回撥返回給應用層。我們來看一下getResponse方法的實現:
Response getResponse(Request request, boolean forWebSocket) throws IOException {
RequestBody body = request.body();
if(body != null) {
Builder followUpCount = request.newBuilder();
MediaType releaseConnection = body.contentType();
if(releaseConnection != null) {
followUpCount.header("Content-Type", releaseConnection.toString());
}
long response = body.contentLength();
if(response != -1L) {
followUpCount.header("Content-Length", Long.toString(response));
followUpCount.removeHeader("Transfer-Encoding");
} else {
followUpCount.header("Transfer-Encoding", "chunked");
followUpCount.removeHeader("Content-Length");
}
request = followUpCount.build();
}
this.engine = new HttpEngine(this.client, request, false, false, forWebSocket, (StreamAllocation)null, (RetryableSink)null, (Response)null);
int var20 = 0;
while(!this.canceled) {
boolean var21 = true;
boolean var15 = false;
StreamAllocation streamAllocation;
label173: {
label172: {
try {
HttpEngine followUp;
try {
var15 = true;
this.engine.sendRequest();
this.engine.readResponse();
var21 = false;
var15 = false;
break label173;
} catch (RequestException var16) {
throw var16.getCause();
} catch (RouteException var17) {
followUp = this.engine.recover(var17.getLastConnectException(), (Sink)null);
if(followUp == null) {
throw var17.getLastConnectException();
}
} catch (IOException var18) {
followUp = this.engine.recover(var18, (Sink)null);
if(followUp != null) {
var21 = false;
this.engine = followUp;
var15 = false;
break label172;
}
throw var18;
}
var21 = false;
this.engine = followUp;
var15 = false;
} finally {
if(var15) {
if(var21) {
StreamAllocation streamAllocation1 = this.engine.close();
streamAllocation1.release();
}
}
}
if(var21) {
streamAllocation = this.engine.close();
streamAllocation.release();
}
continue;
}
if(var21) {
streamAllocation = this.engine.close();
streamAllocation.release();
}
continue;
}
if(var21) {
StreamAllocation var22 = this.engine.close();
var22.release();
}
Response var23 = this.engine.getResponse();
Request var24 = this.engine.followUpRequest();
if(var24 == null) {
if(!forWebSocket) {
this.engine.releaseStreamAllocation();
}
return var23;
}
streamAllocation = this.engine.close();
++var20;
if(var20 > 20) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + var20);
}
if(!this.engine.sameConnection(var24.url())) {
streamAllocation.release();
streamAllocation = null;
}
this.engine = new HttpEngine(this.client, var24, false, false, forWebSocket, streamAllocation, (RetryableSink)null, var23);
}
this.engine.releaseStreamAllocation();
throw new IOException("Canceled");
}
先構造一個HttpEngine物件,然後以!this.canceled為條件進行迴圈,while迴圈中的邏輯採用了java label的標籤寫法,應該和C++中的goto是一樣的道理,從實際開發中來看,label標籤已經很少採用了,所以一般也不建議大家使用這樣的寫法,這樣的程式碼易讀性不好。這裡主要就是第一個try程式碼塊中處理請求,沒有異常就直接跳出label173,然後呼叫this.engine.followUpRequest()取出下一個請求,再以它的返回值為引數構造一個新的HttpEngine,繼續在while迴圈中處理。當然如果followUpRequest取回來的為空,那也就是說只有一個請求,並且已經處理完了,那就直接返回當前的結果var23了。這個方法當中的一些變數命名也不是很好,根本看不出來是啥意思,不知道開發者是怎麼想的。這個方法分析完了,就要進入主要的邏輯了。
2、RealCall類的getResponse方法中呼叫HttpEngine.sendRequest傳送請求
再來對照流程圖,從中也可以看到,這一步的任務就是傳送訊息頭,我們來看一下這個方法的實現:
public void sendRequest() throws RequestException, RouteException, IOException {
if(this.cacheStrategy == null) {
if(this.httpStream != null) {
throw new IllegalStateException();
} else {
Request request = this.networkRequest(this.userRequest);
InternalCache responseCache = Internal.instance.internalCache(this.client);
Response cacheCandidate = responseCache != null?responseCache.get(request):null;
long now = System.currentTimeMillis();
this.cacheStrategy = (new Factory(now, request, cacheCandidate)).get();
this.networkRequest = this.cacheStrategy.networkRequest;
this.cacheResponse = this.cacheStrategy.cacheResponse;
if(responseCache != null) {
responseCache.trackResponse(this.cacheStrategy);
}
if(cacheCandidate != null && this.cacheResponse == null) {
Util.closeQuietly(cacheCandidate.body());
}
if(this.networkRequest == null && this.cacheResponse == null) {
this.userResponse = (new Builder()).request(this.userRequest).priorResponse(stripBody(this.priorResponse)).protocol(Protocol.HTTP_1_1).code(504).message("Unsatisfiable Request (only-if-cached)").body(EMPTY_BODY).build();
} else if(this.networkRequest == null) {
this.userResponse = this.cacheResponse.newBuilder().request(this.userRequest).priorResponse(stripBody(this.priorResponse)).cacheResponse(stripBody(this.cacheResponse)).build();
this.userResponse = this.unzip(this.userResponse);
} else {
boolean success = false;
try {
this.httpStream = this.connect();
this.httpStream.setHttpEngine(this);
if(this.writeRequestHeadersEagerly()) {
long contentLength = OkHeaders.contentLength(request);
if(this.bufferRequestBody) {
if(contentLength > 2147483647L) {
throw new IllegalStateException("Use setFixedLengthStreamingMode() or setChunkedStreamingMode() for requests larger than 2 GiB.");
}
if(contentLength != -1L) {
this.httpStream.writeRequestHeaders(this.networkRequest);
this.requestBodyOut = new RetryableSink((int)contentLength);
} else {
this.requestBodyOut = new RetryableSink();
}
} else {
this.httpStream.writeRequestHeaders(this.networkRequest);
this.requestBodyOut = this.httpStream.createRequestBody(this.networkRequest, contentLength);
}
}
success = true;
} finally {
if(!success && cacheCandidate != null) {
Util.closeQuietly(cacheCandidate.body());
}
}
}
}
}
}
在這個方法中,networkRequest不為空,所以就進入最後一個else分支進行處理,第一句this.httpStream = this.connect()就是和服務端取得連線的過程,返回一個HttpStream物件,這個過程非常複雜,也非常重要。因為後邊的資料傳輸就是在這個連線的基礎上進行的,像常用的HTTP連線,BT連線等等的都會有這個過程,大家可以去看一下android bluetooth藍芽模組的原始碼,也是相同的,我們這裡就不展開了。bufferRequestBody是構造HttpEngine物件時傳進來的,值為false,所以接下來就是呼叫writeRequestHeaders、createRequestBody來處理了。connect()返回來的是一個Http2xStream物件,它是實現了HttpStream的,writeRequestHeaders是由它來實現的,我們來看一下writeRequestHeaders方法的實現:
public void writeRequestHeaders(Request request) throws IOException {
if(this.stream == null) {
this.httpEngine.writingRequestHeaders();
boolean permitsRequestBody = this.httpEngine.permitsRequestBody(request);
List requestHeaders = this.framedConnection.getProtocol() == Protocol.HTTP_2?http2HeadersList(request):spdy3HeadersList(request);
boolean hasResponseBody = true;
this.stream = this.framedConnection.newStream(requestHeaders, permitsRequestBody, hasResponseBody);
this.stream.readTimeout().timeout((long)this.httpEngine.client.readTimeoutMillis(), TimeUnit.MILLISECONDS);
this.stream.writeTimeout().timeout((long)this.httpEngine.client.writeTimeoutMillis(), TimeUnit.MILLISECONDS);
}
}
這個方法當中就是將當前的請求頭進行轉換,得到一個List requestHeaders資料,然後將它作為引數,呼叫FramedConnection類的newStream方法,我們接著來看newStream方法的實現:
public FramedStream newStream(List requestHeaders, boolean out, boolean in) throws IOException {
return this.newStream(0, requestHeaders, out, in);
}
private FramedStream newStream(int associatedStreamId, List requestHeaders, boolean out, boolean in) throws IOException {
boolean outFinished = !out;
boolean inFinished = !in;
FrameWriter var9 = this.frameWriter;
FramedStream stream;
synchronized(this.frameWriter) {
int streamId;
synchronized(this) {
if(this.shutdown) {
throw new IOException("shutdown");
}
streamId = this.nextStreamId;
this.nextStreamId += 2;
stream = new FramedStream(streamId, this, outFinished, inFinished, requestHeaders);
if(stream.isOpen()) {
this.streams.put(Integer.valueOf(streamId), stream);
this.setIdle(false);
}
}
if(associatedStreamId == 0) {
this.frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId, requestHeaders);
} else {
if(this.client) {
throw new IllegalArgumentException("client streams shouldn\'t have associated stream IDs");
}
this.frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders);
}
}
if(!out) {
this.frameWriter.flush();
}
return stream;
}
這個方法當中就是構造一個FramedStream物件,associatedStreamId值為0,是呼叫時傳進來的,然後呼叫成員變數this.frameWriter.synStream方法將資料寫入流中,往下的我們就不跟蹤了。
再回來,對照流程圖,看一下Http2xStream類的createRequestBody方法。它當就很簡單,就是呼叫getSink()將上一步返回來的FramedStream物件的成員變數sink取出來返回給呼叫者。注意,這裡返回的實體是FramedStream.FramedDataSink物件,它是實現了Sink介面的。
好了,請求頭髮送完了,再來對照一下流程圖,接下來就是讀取響應了。
3、RealCall類的getResponse方法中呼叫HttpEngine.readResponse讀取響應
我們來看一下這個方法的程式碼:
public void readResponse() throws IOException {
if(this.userResponse == null) {
if(this.networkRequest == null && this.cacheResponse == null) {
throw new IllegalStateException("call sendRequest() first!");
} else if(this.networkRequest != null) {
Response networkResponse;
if(this.forWebSocket) {
this.httpStream.writeRequestHeaders(this.networkRequest);
networkResponse = this.readNetworkResponse();
} else if(!this.callerWritesRequestBody) {
networkResponse = (new HttpEngine.NetworkInterceptorChain(0, this.networkRequest)).proceed(this.networkRequest);
} else {
if(this.bufferedRequestBody != null && this.bufferedRequestBody.buffer().size() > 0L) {
this.bufferedRequestBody.emit();
}
if(this.sentRequestMillis == -1L) {
if(OkHeaders.contentLength(this.networkRequest) == -1L && this.requestBodyOut instanceof RetryableSink) {
long responseCache = ((RetryableSink)this.requestBodyOut).contentLength();
this.networkRequest = this.networkRequest.newBuilder().header("Content-Length", Long.toString(responseCache)).build();
}
this.httpStream.writeRequestHeaders(this.networkRequest);
}
if(this.requestBodyOut != null) {
if(this.bufferedRequestBody != null) {
this.bufferedRequestBody.close();
} else {
this.requestBodyOut.close();
}
if(this.requestBodyOut instanceof RetryableSink) {
this.httpStream.writeRequestBody((RetryableSink)this.requestBodyOut);
}
}
networkResponse = this.readNetworkResponse();
}
this.receiveHeaders(networkResponse.headers());
if(this.cacheResponse != null) {
if(validate(this.cacheResponse, networkResponse)) {
this.userResponse = this.cacheResponse.newBuilder().request(this.userRequest).priorResponse(stripBody(this.priorResponse)).headers(combine(this.cacheResponse.headers(), networkResponse.headers())).cacheResponse(stripBody(this.cacheResponse)).networkResponse(stripBody(networkResponse)).build();
networkResponse.body().close();
this.releaseStreamAllocation();
InternalCache responseCache1 = Internal.instance.internalCache(this.client);
responseCache1.trackConditionalCacheHit();
responseCache1.update(this.cacheResponse, stripBody(this.userResponse));
this.userResponse = this.unzip(this.userResponse);
return;
}
Util.closeQuietly(this.cacheResponse.body());
}
this.userResponse = networkResponse.newBuilder().request(this.userRequest).priorResponse(stripBody(this.priorResponse)).cacheResponse(stripBody(this.cacheResponse)).networkResponse(stripBody(networkResponse)).build();
if(hasBody(this.userResponse)) {
this.maybeCache();
this.userResponse = this.unzip(this.cacheWritingResponse(this.storeRequest, this.userResponse));
}
}
}
}
在這裡的邏輯中,構造HttpEngine物件時,傳入的兩個引數forWebSocket、callerWritesRequestBody都是false,所以這裡就執行第二個else if分支。可以看到,在okhttp框架中,有很多攔截器,它們以Chain的形式組成一條鏈,至於這塊的東西,我沒有去深入研究,就不展開了,我們繼續我們的流程分析。這裡就是構造一個HttpEngine.NetworkInterceptorChain物件,然後呼叫它的proceed方法,從我們的流程圖當中也可以看到,真正的檔案傳輸就是在這裡進行的。我們來看一下proceed方法的邏輯:
public Response proceed(Request request) throws IOException {
++this.calls;
if(this.index > 0) {
Interceptor response = (Interceptor)HttpEngine.this.client.networkInterceptors().get(this.index - 1);
Address code = this.connection().route().address();
if(!request.url().host().equals(code.url().host()) || request.url().port() != code.url().port()) {
throw new IllegalStateException("network interceptor " + response + " must retain the same host and port");
}
if(this.calls > 1) {
throw new IllegalStateException("network interceptor " + response + " must call proceed() exactly once");
}
}
if(this.index < HttpEngine.this.client.networkInterceptors().size()) {
HttpEngine.NetworkInterceptorChain var7 = HttpEngine.this.new NetworkInterceptorChain(this.index + 1, request);
Interceptor var10 = (Interceptor)HttpEngine.this.client.networkInterceptors().get(this.index);
Response interceptedResponse = var10.intercept(var7);
if(var7.calls != 1) {
throw new IllegalStateException("network interceptor " + var10 + " must call proceed() exactly once");
} else if(interceptedResponse == null) {
throw new NullPointerException("network interceptor " + var10 + " returned null");
} else {
return interceptedResponse;
}
} else {
HttpEngine.this.httpStream.writeRequestHeaders(request);
HttpEngine.this.networkRequest = request;
if(HttpEngine.this.permitsRequestBody(request) && request.body() != null) {
Sink var5 = HttpEngine.this.httpStream.createRequestBody(request, request.body().contentLength());
BufferedSink var8 = Okio.buffer(var5);
request.body().writeTo(var8);
var8.close();
}
Response var6 = HttpEngine.this.readNetworkResponse();
int var9 = var6.code();
if((var9 == 204 || var9 == 205) && var6.body().contentLength() > 0L) {
throw new ProtocolException("HTTP " + var9 + " had non-zero Content-Length: " + var6.body().contentLength());
} else {
return var6;
}
}
}
我們在上一步構造HttpEngine.NetworkInterceptorChain物件時,傳入的第一個引數index為0,而在這個呼叫過程當中,client中的攔截器的數量也是0,所以就直接執行到最後的一個else分支當中了。在這裡判斷我們的request.body() != null,因為我們要上傳檔案,所以這個條件為true,那麼就呼叫構造好一個BufferedSink物件,然後呼叫request.body().writeTo(var8)把我們的目標檔案寫入流中。request.body()獲取到的是一個RequestBody物件,但是它的writeTo方法定義的是一個虛擬函式,而實現就是在我們一開始呼叫create(final MediaType contentType, final File file)構造Request請求當中,直接new了一個new RequestBody(),大家可以回頭看看我們本篇的第一個程式碼片,這裡的writeTo就是回撥到那裡的,而回調時的引數BufferedSink也是在回撥前構造好的。writeTo方法中先呼叫Okio.source(file),將我們的目標檔案轉換成Source型別,然後再sink.writeAll(source)把資料寫進去,這裡的流傳輸底層實現應該都是一樣的,在看藍芽模組檔案傳輸的程式碼時,也可以看到這樣的邏輯,我們的檔案如何傳過去的呢?就是把檔案通過Stream轉換成流,然後將流資料通過Socket傳送給目標。
好了,我們再回到HttpEngine.NetworkInterceptorChain類的proceed方法當中,檔案資料寫完後,就呼叫HttpEngine.this.readNetworkResponse()讀取響應,最終返回給應用層的Response也就是在這裡生成的。這裡完成後,再往上退一步,就到了RealCall類的getResponse方法中,請求已經處理完了,響應資料也拿回來了,最後通過this.engine.getResponse()獲取到響應,然後就返回給應用層了。
在這篇文章中,我們只是大概走了一下Okhttp網路框架中的一個最簡單的流程,還有很多非常重要的細節沒有深入研究,比如將檔案讀入到流之後,最後如何通過Socket傳送過去的,中間過程還有涉及到很多非常重要的物件,如檔案轉換後得到的Source,寫檔案時使用的BufferedSink等等,還有很多重量級物件,大家如果有興趣,可以自己研究一下。雖然我們沒有進一步深入分析這些細節,但是從這個過程中,我們還是瞭解到了網路通訊的一些實現,也希望能給大家帶來一些幫助。
這節課就到這裡了,謝謝大家,也希望大家關注我的部落格!!