OKHttp3原始碼分析
1.執行緒池的好處
幾乎所有需要非同步操作或者併發執行任務的程式都可以使用執行緒池,三個好處:
- 降低資源消耗:通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗;
- 提高響應速度:當任務到達的時候,任務可以不需要等到現成的額建立就能立即執行;
- 提高執行緒的可管理性:執行緒是稀缺資源,如果無限制地建立,不僅會消耗系統資源,還會降低系統穩定性,使執行緒池可以進行統一的分配、調優和監控。
2.Okhttp3 執行緒池的配置
實際上是一個CacheThreadPool(核心執行緒為0,最大執行緒為Integer.MAX_VALUE),空閒執行緒等待60秒之後如果沒有新任務的話,就終止程序。
適用於執行很多短期非同步任務的小程式,或者負載比較輕的伺服器.下面的程式碼就說明了:
Dispatcher.java
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
可以看出,在Okhttp中,構建了一個閥值為[0, Integer.MAX_VALUE]的執行緒池,它不保留任何最小執行緒數,隨時建立更多的執行緒數,當執行緒空閒時只能活60秒,它使用了一個不儲存元素的阻塞工作佇列,一個叫做”OkHttp Dispatcher”的執行緒工廠。
也就是說,在實際執行中,當收到10個併發請求時,執行緒池會建立十個執行緒,當工作完成後,執行緒池會在60s後相繼關閉所有執行緒。
我們在使用OKhttp3執行 非同步請求任務:
a)通過OkHttpClient物件例項呼叫newCall方法
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("http://www.baidu.com")
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
System.out.println("請求失敗");
}
@Override
public void onResponse(Call call, Response response) throws IOException {
System.out.println("請求成功");
System.out.println(response.body().string());
}
});
b).client.newCall(request)返回的是一個RealCall物件:
OkHttpClient.java
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}
c).RealCall物件中的enqueue方法:
RealCall.java
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
這裡你也許會有疑問,這個方法傳遞進去了Callback物件,例項化了一個AsyncCall物件(new AsyncCall(responseCallback)),那我們的請求將在哪裡執行呢?
原來RealCall的建構函式中將請求物件賦值給了RealCall中的成員變數originalRequest
RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
然後再來看看AsyncCall類(這個類是RealCall類的內部類,所以這個類將可以得到RealCall的originalRequest ,也就是我們的請求),AsyncCall類繼承於NamedRunnable類,NamedRunnable類是實現了Runnable介面的。
看一下NamedRunnable類的定義:
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
從程式碼中我們可以看到NamedRunnable 的子類會在其run()方法去執行execute()方法。
現在再回到AsyncCall(NamedRunnable 的子類)的execute()方法:
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
我們在程式碼中可以看到Response response = getResponseWithInterceptorChain();
這行程式碼,這究竟是什麼:
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
這裡我們終於知道了我們的請求最終是經過執行緒池的呼叫執行緒執行中的run(),然後run()呼叫execute()方法,接著execute()方法呼叫getResponseWithInterceptorChain(),最後我們的請求才得以處理。
d).再回到client.dispatcher()返回的是一個dispatcher物件,看看dispatcher物件中的enqueue()方法:
Dispatcher.java
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
當執行中的非同步請求任務佇列(runningAsyncCalls)的長度小於執行緒池的最大請求數(maxRequests),並且對同一個主機的請求數目小於初始化maxRequestsPerHost的時候,將請求加入執行中非同步請求佇列,並讓執行緒池去執行這個請求
e).看看這行程式碼:executorService().execute(call);
這行程式碼會執行我們的請求,不明白的回去看c)
executorService()返回的是一個CacheThreadPool,這個執行緒池是一個大小無界的執行緒池,適用於執行很多的短期的非同步任務的小程式,或者負載較輕的伺服器。
這個執行緒池使用的阻塞佇列是SynchronousQueue,這是一個沒有容量的阻塞佇列。每一個插入操作(offer)必須等待另一個執行緒對應的移除操作(poll),反之亦然。下圖是CacheThreadPool中任務傳遞的示意圖:
看看CacheThreadPool的execute方法的執行示意圖:
1)首先執行SynchronousQueue.offer(Runnable task)。如果當前maximumPool中有空閒執行緒
正在執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那麼主執行緒執行
offer操作與空閒執行緒執行的poll操作配對成功,主執行緒把任務交給空閒執行緒執行,execute()方
法執行完成;否則執行下面的步驟2)。
2)當初始maximumPool為空,或者maximumPool中當前沒有空閒執行緒時,將沒有執行緒執行
SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這種情況下,步驟1)將失
敗。此時CachedThreadPool會建立一個新執行緒執行任務,execute()方法執行完成。
3)在步驟2)中新建立的執行緒將任務執行完後,會執行
SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這個poll操作會讓空閒線
程最多在SynchronousQueue中等待60秒鐘。如果60秒鐘內主執行緒提交了一個新任務(主執行緒執
行步驟1)),那麼這個空閒執行緒將執行主執行緒提交的新任務;否則,這個空閒執行緒將終止。由於
空閒60秒的空閒執行緒會被終止,因此長時間保持空閒的CachedThreadPool不會使用任何資源。
至此,使用OKhttp3發起一次非同步請求的流程就走完了!
3.執行緒池是如何實現複用的
回到2.c)中的程式碼,
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
最後會執行client.dispatcher().finished(this);
,進到Dispatcher類中的
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
看到if (promoteCalls) promoteCalls();,進入 promoteCalls()方法
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
發現執行緒池中執行緒的複用正是在這裡得到了複用,這個方法將readyAsyncCalls中的任務加到了runningAsyncCalls佇列中,以複用已經空閒的執行緒。