OkHttp原始碼詳解之二完結篇
1. 請大家思考幾個問題
在開始本文之前,請大家思考如下幾個問題。並請大家帶著這幾個問題,去本文尋找答案。如果你對下面幾個問題的答案瞭如指掌那本文可以略過不看
- 在瀏覽器中輸入一個網址,按回車後發生了什麼?
- Okhttp的TCP連線建立發生在什麼時候?
- Okhttp的Request請求發生在什麼時候?
- Okio與Okhttp是在什麼時候發生關聯的?
- Okhttp的Interceptor和Chain是怎麼串聯起來的?
- Okhttp同步請求和非同步請求分別是怎麼實現的。非同步請求有限制嗎?
本文將圍繞以上6個問題對Okhttp做一個簡單的講解,如有遺漏的知識點,歡迎在評論區指出,一起探討成長。
2. Http請求的流程
首先來回答第一個問題“在瀏覽器中輸入一個網址,按回車後發生了什麼?”。Http權威指南一書給出的答案是發生了7件事情
- 瀏覽器從url中解析出主機名
- 瀏覽器將伺服器的主機名轉換成伺服器的IP地址
- 瀏覽器將埠號從url中解析出來
- 瀏覽器建立一條與Web伺服器的TCP連線
- 瀏覽器向伺服器傳送一條Http請求報文
- 伺服器向瀏覽器回送一條Http響應報文
- 關閉連線,瀏覽器顯示文件
以上七步是每一個Http請求必須要做的事情,Okhttp庫要實現Http請求也不例外。這七個步驟的每一步,在Okhttp中都有體現。
- HttpUrl類負責步驟1和步驟3的主機名和埠解析
- Dns介面的具體實現負責步驟2的實現
- RealConnection類就是步驟4中的那條TCP連線
- CallServerInterceptor攔截器的intercept方法負責步驟5和步驟6的傳送報文和接收報文
- ConnectPool連線池中提供了關閉TCP連線的方法
- Okio在哪操作?當然是往Socket的OutputStream寫請求報文和從Socket的InputStram讀取響應報文了
至此文章開頭提出的6個問題,前5個都已經有了一個簡單的回答。至於第六個問題,非同步在Okhttp中用的快取執行緒池,理論上快取執行緒池,是當有任務到來,就會從執行緒池中取一個空閒的執行緒或者新建執行緒來處理任務,而且快取執行緒池的執行緒數是Integer.MAX_VALUE。理論上是沒有限制的。但是Dispatcher類線上程池的基礎上,做了強制限制,最多可以同時處理的網路請求數64個,對於同一個主機名,最多可以同時處理5個網路請求。接下來我帶大家從原始碼的角度來尋找這幾個問題的答案
3.初識Okhttp
首先我們來寫兩個簡單的例子來使用Okhttp完成get和post請求。
同步get請求
OkHttpClient client = new OkHttpClient();
String run(String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();
Response response = client.newCall(request).execute();
return response.body().string();
}
同步post請求
public static final MediaType JSON
= MediaType.parse("application/json; charset=utf-8");
OkHttpClient client = new OkHttpClient();
String post(String url, String json) throws IOException {
RequestBody body = RequestBody.create(JSON, json);
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
Response response = client.newCall(request).execute();
return response.body().string();
}
非同步get請求
OkHttpClient client = new OkHttpClient();
void run(String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
System.out.println(response.body().string());
}
});
}
非同步post請求
public static final MediaType JSON
= MediaType.parse("application/json; charset=utf-8");
OkHttpClient client = new OkHttpClient();
void post(String url, String json) throws IOException {
RequestBody body = RequestBody.create(JSON, json);
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
}
Okhttp完成一次網路請求,過程如下
- 建立一個OkHttpClient物件client
- 建立一個Request物件request,並設定url
- 呼叫client.newCall(request)生成一個Call物件call
- 呼叫call.execute()[同步呼叫]或者call.enqueue(callback)[非同步呼叫]完成網路請求
4. 同步和非同步網路請求過程
同步呼叫過程如下
OkHttpClient.newCall(Request r) => RealCall.execute() => RealCall.getResponseWithInterceptorChain()
非同步呼叫過程如下
OkHttpClient.newCall(Request r) => RealCall.enqueue(Callback) => Dispatcher.enqueue(AsyncCall)
=> 執行緒池執行AsyncCall =>AsyncCall.execute() => RealCall.getResponseWithInterceptorChain()
從上圖可以看出非同步呼叫比同步呼叫步驟更長,也更復雜。在這裡我把整個呼叫過程分成兩個階段,RealCall.getResponseWithInterceptorChain()稱作“網路請求執行階段”,其餘部分稱作“網路請求準備階段”。由於“網路請求執行階段”涉及到鏈式(Chain)呼叫以及各種攔截器的執行比“網路請求準備階段”複雜太多。所以我們先來看“網路請求準備階段”,這個階段也需要分成同步和非同步兩種方式來講解
首先我們來看下準備階段的公共呼叫部分OkHttpClient.newCall(Request r)
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}
Call是什麼,在我看來Call可以理解成是對網路請求封裝,它可以執行網路請求,也可以取消網路請求。我們看下Call的類定義
public interface Call extends Cloneable {
Request request();
Response execute() throws IOException;
void enqueue(Callback responseCallback);
void cancel();
boolean isExecuted();
boolean isCanceled();
Call clone();
interface Factory {
Call newCall(Request request);
}
}
- request()返回的Request物件
- execute()同步執行網路操作
- enqueue(Callback responseCallback)非同步執行網路操作
- cancel()取消網路操作
RealCall是Call的實現類。我們重點來看下execute()和enqueue(Callback responseCallback)
execute()方法
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
//這裡只是把call存到Dispatcher的列表中
client.dispatcher().executed(this);
//真正執行網路操作
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
enqueue(Callback callback)
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
我們注意到AsyncCall類,首先它是RealCall類的非靜態內部類,它持有RealCall的物件,它可以做任何RealCall能做的事情。同時它繼承自NamedRunnable。NamedRunnable是Ruannable的子類。它主要有兩個功能,1.可以被執行緒池執行 2.修改當前執行執行緒的執行緒名(方便debug)。由於這幾個類都比較簡單,而且篇幅有限,這裡就不貼程式碼了。請自行查閱
當然到這裡非同步執行的準備階段並沒有結束,它是怎麼被子執行緒執行的呢。這裡我們就需要在Dispatcher類中尋找答案了。Dispatcher是幹嘛用的,它的功能就是負責分發使用者建立的網路請求,以及控制網路請求的個數,以及上一個網路請求結束後,要不要執行等待中的網路請求。下面我們來看下Dispatcher都有哪些成員變數,以及這些成員變數的作用
public final class Dispatcher {
//最多可以同時請求數量
private int maxRequests = 64;
//每個host最大同時請求數量
private int maxRequestsPerHost = 5;
//當沒有任務執行的回撥 比如說執行10個任務,10個任務都執行完了會回撥該介面
private Runnable idleCallback;
/** Executes calls. Created lazily. */
//執行緒池 用的是快取執行緒池(提高吞吐量,並且能及時回收無用的執行緒)
private ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
//等待的非同步任務佇列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在執行的非同步任務佇列 這個放的是Runnable
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
//正在執行的同步任務佇列 這個儲存的是RealCall
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
}
總結下幾個比較重要的概念
- executorService 執行緒池並且是快取執行緒池,最最大程度提高吞吐量,並且能回收空閒的執行緒
- Deque readyAsyncCalls非同步請求等待佇列,當非同步請求數超過64個,或者單個主機名請求超過5個,網路任務(AsyncCall)會放到該佇列裡。當任務執行完畢,會呼叫promoteCalls()把滿足條件的網路任務(AsyncCall)放到執行緒池中執行
- Deque runningAsyncCalls 正在執行的非同步任務佇列
- Deque runningSyncCalls 正在執行的同步任務佇列
緊接著我們來看下Dispatcher的enqueue(AsyncCall call)
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//如果請求沒有超過限制 用執行緒池執行網路請求
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
//如果超過了限制,把請求放到非同步等待佇列中去,什麼時候被喚醒?finished後
readyAsyncCalls.add(call);
}
}
通過執行緒池執行AsyncCall 最終呼叫的是AsyncCall的run(),而run()中呼叫的是AsyncCall的execute()
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
在execute()中呼叫了getResponseWithInterceptorChain(),也就是我們前面說的“網路請求執行階段”。至此,同步和非同步網路請求的“網路請求準備階段”就講解完了,接下來我們講解“網路請求執行階段”
5. Interceptor和Chain
Response getResponseWithInterceptorChain() throws IOException {
List<Interceptor> interceptors = new ArrayList<>();
//使用者自定義的攔截器
interceptors.addAll(client.interceptors());
//重試和重定向攔截器
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//快取攔截器
interceptors.add(new CacheInterceptor(client.internalCache()));
//連線攔截器
interceptors.add(new ConnectInterceptor(client));
//使用者自定義的網路攔截器
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
//真正請求伺服器的攔截器
interceptors.add(new CallServerInterceptor(forWebSocket));
//用Chain把List中的interceptors串起來執行
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
要理解上面的程式碼。我們需要搞清楚兩個概念 Interceptor和Chain。我們先來看下它們的定義
public interface Interceptor {
Response intercept(Chain chain) throws IOException;
interface Chain {
Request request();
Response proceed(Request request) throws IOException;
Connection connection();
}
}
可以舉一個比較形象的例子來解釋它們的關係。就以我們IT行業來講。老闆說想要一個APP,然後把需求轉化成任務流轉給產品經理,產品經理構思了下,然後把任務流轉給設計師,設計師一通設計之後,把設計圖流轉給碼農,碼農接到任務後就開始加班加點的編碼,最終寫出了APP,然後把APP交給設計師,讓設計師檢視介面是否美觀,設計師再把APP流轉給產品經理,產品經理覺得很滿意,最終把APP交付給老闆。老闆看了很滿意,說大傢伙晚上加個雞腿。雖然實際生產中並不是這樣的一個流程,想了很久覺得還是這樣講,更好理解。對應到這個例子中,產品經理、設計師、程式設計師他們都是真正幹活的傢伙,他們對應的是Interceptor。Chain是什麼,是老闆嗎?不是!!Chain只是一套規則,對應的就是例子裡的流轉流程。
對照圖片,“網路請求執行階段”會依次執行Interceptors裡的Interceptor。Interceptor執行分成三步。第一步:處理請求 第二步:執行下個Interceptor 第三步:處理響應
List<Interceptor> interceptors = new ArrayList<>();
//使用者自定義的攔截器
interceptors.addAll(client.interceptors());
//重試和重定向攔截器
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//快取攔截器
interceptors.add(new CacheInterceptor(client.internalCache()));
//連線攔截器
interceptors.add(new ConnectInterceptor(client));
//使用者自定義的網路攔截器
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
//真正請求伺服器的攔截器
interceptors.add(new CallServerInterceptor(forWebSocket));
首先執行的就是retryAndFollowUpInterceptor
RetryAndFollowUpInterceptor
//第一部分 根據url解析出主機名 解析埠
Request request = chain.request();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
....省略
while(true){
....
//第二部分處理下一個Interceptor
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
releaseConnection = false;
....
//第三部分 處理響應,如果是重定向,用while迴圈,重新處理下一個Interceptor
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
closeQuietly(response.body());
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
}
BridgeInterceptor和CacheInterceptor請讀者自行分析
ConnectInterceptor中會解析主機DNS並且建立TCP連線,Socket的輸入輸出流通過Source和Sink處理
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//這裡會解析DNS並且建立TCP連線,Socket的輸入輸出流會和Okio的Source和Sink關聯
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}
CallServerInterceptor 通過httpCodec的sink向socket傳送請求,並且通過httpCodec的openResponseBody把socket的輸入流寫入到Response中
@Override public Response intercept(Chain chain) throws IOException {
HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
Request request = chain.request();
long sentRequestMillis = System.currentTimeMillis();
httpCodec.writeRequestHeaders(request);
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return what
// we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
responseBuilder = httpCodec.readResponseHeaders(true);
}
// Write the request body, unless an "Expect: 100-continue" expectation failed.
if (responseBuilder == null) {
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
}
httpCodec.finishRequest();
if (responseBuilder == null) {
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))//真正把body寫進去
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
6. 結束語
OkHttp庫還是挺龐大的,涉及到很多Http的基礎知識。這裡只是講解了OkHttp的一小部分。很多細節的東西也沒有深入講解。比如說Socket的建立,連線池的管理,HttpCodec如何解析輸入輸出流,路由的細節。希望有機會可以再深入的講解一番