OkHttp Dispatcher的排程過程分析
Dispatcher是負責對okhttp所有的請求進行排程管理的類。可以通過Dispatcher獲取,或者取消所有請求。這裡指的一個請求就是對應的Call,並不是指Request,下面出現的所有的請求都是指Call。這裡通過分析跟蹤okhttp傳送請求的過程來分析Dispatcher是如何維護和排程我們發出的所有請求的。
Call其實就是對Request的封裝。
OkHttp請求方式
通過okhttp傳送請求主要有兩種方式。
- 通過
execute()
呼叫,此時request會被馬上發出, 直到返回response或者發生錯誤前會一直阻塞。可以理解為一個立即執行的同步請求。 - 通過
enqueue()
第一種方式
通過execute()
呼叫,一般是這樣的
okHttpClient.newCall(request).execute();
通過OkHttpClient的程式碼可以看出newCall()
方法其實是new了一個RealCall,所以這裡直接檢視RealCall的execute()
方法。
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}
RealCall的execute()
方法,這裡只看下核心的程式碼:
@Override public Response execute() throws IOException {
synchronized (this) {
//此處除去一些其他程式碼
//...
try {
//通知Dispatcher這個Call正在被執行,同時將此Call交給Dispatcher
//Dispatcher可以對此Call進行管理
client.dispatcher().executed(this );
//請求的過程,注意這行程式碼是阻塞的,直到返回result!
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
//此時這個請求已經執行完畢了,通知Dispatcher,不要再維護這個Call了
client.dispatcher().finished(this);
}
}
首先注意這行程式碼
client.dispatcher().executed(this);
它是呼叫的Dispatcher的executed()
方法,注意看方法名是executed並不是execute。接下來去Dispatcher裡看下這個方法做了什麼。
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
看註釋就明白了,這裡他只是一個通知的作用,通知Dispatcher我這個call立即要被或者正在被執行,然後Dispatcher會把加入一個名為runningSyncCalls的雙端佇列中,這個佇列中儲存著所有的正在執行的同步請求。這樣Dispatcher就可以很方便的對所有的同步請求進行管理了。既然有新增,那麼也應該有刪除,在請求執行完畢時呼叫了這行程式碼:
client.dispatcher().finished(this);
通過字面意思理解他應該就是刪除的操作,通知Dispatcher這個請求已經被執行完畢了。這裡暫時理解為呼叫finished方法就是將此call從runningSyncCalls中移除,後面會再討論finished方法的細節。
因為同步請求是被馬上執行的,所以Dispatcher能對同步請求進行的排程也只有cancel了。具體可以通過呼叫Dispatcher.cancelAll()
方法進行取消。
所以真正執行請求的只有這行程式碼了。
Response result = getResponseWithInterceptorChain();
這個方法先不管他,就可以理解為這行程式碼的執行就是請求從發出到完成的過程。在分析攔截器的實現原理的時候再來討論。
第二種方式
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
通過上面我們已經知道這裡呼叫的也是RealCall的enqueue方法,我們直接來看程式碼:
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
//判斷是否已經執行過了
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
//捕獲呼叫棧的資訊,用來分析連線洩露
captureCallStackTrace();
//封裝一個AsyncCall交給Dispatcher排程
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
通過上面的程式碼可以看出呼叫enqueue()
方法,其實是呼叫了Dispatcher的enqueue()
方法,並且new了一個AsyncCall作為引數。AsyncCall為RealCall的一個內部類,下面繼續看AsyncCall類裡到底做了什麼。
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
//...
/**
*真正執行發出請求的地方,為了看起來清晰,精簡了部分程式碼
*/
@Override protected void execute() {
try {
//請求的過程,注意這裡也是阻塞的
Response response = getResponseWithInterceptorChain();
//先不管這個Interceptor是幹嘛的,下面的程式碼可以理解為:
//如果沒有被取消,並且沒有發生異常,回撥onResponse方法。
//如果發生了異常或者被取消,回撥onFailure方法。
if (retryAndFollowUpInterceptor.isCanceled()) {
//此請求被取消了,回撥onFailure
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
//此請求成功了,回撥onResponse
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
//發生了異常,回撥onFailure
responseCallback.onFailure(RealCall.this, e);
} finally {
//通知Dispatcher Call被執行完畢了
client.dispatcher().finished(this);
}
}
}
可以看到AsyncCall的execute()
就是具體請求執行的地方,只不過和上面的RealCall的execute()
方法相比,多了回撥的處理。retryAndFollowUpInterceptor其實是負責請求超時的重試和重定向操作的,retryAndFollowUpInterceptor.isCanceled()
就是用來判斷這個請求是否被取消了,這裡就不深入展開了。那麼AsyncCall的execute()
方法是怎麼被執行的呢,繼續來看AsyncCall的父類NamedRunnable。
/**
* Runnable implementation which always sets its thread name.
*/
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
//注意這裡呼叫了execute方法
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
可以看到NamedRunnable其實就是一個實現了Runnable介面的抽象類,並且在run方法中呼叫了execute()
。也就是說AsyncCal其實就是一個Runnable,當這個Runnable被呼叫的時候execute()
方法自然會被呼叫。看到這裡就很清晰了,再回過頭來看RealCall的enqueue()
中呼叫的這段程式碼
//封裝一個AsyncCall交給Dispatcher排程
client.dispatcher().enqueue(new AsyncCall(responseCallback));
其實這裡的new AsyncCall(responseCallback)
就是new了一個封裝的Runnable物件,這個Runnable的執行,就是整個請求的發起與回撥的過程。好啦,這裡搞明白了其實呼叫Dispatcher().enqueue()
方法傳遞過去的是一個Runnable物件,接下來就去Dispatcher中看下,他對這個Runnable做了什麼。
synchronized void enqueue(AsyncCall call) {
//判斷正在執行的非同步請求數沒有達到閾值,並且每一個Host的請求數也沒有達到閾值
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//加入到正在執行佇列,並立即執行
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
//加入到等待佇列
readyAsyncCalls.add(call);
}
}
上面的程式碼中又出現了兩個雙端佇列,runningAsyncCalls和readyAsyncCalls,加上上面出現的runningSyncCalls可以看到Dispatcher一共維護了3個請求佇列,分別是
- runningAsyncCalls,正在請求的非同步佇列
- readyAsyncCalls,準備請求的非同步佇列\等待請求的非同步佇列
- runningSyncCalls,正在請求的同步佇列
還出現了一個方法executorService()
,接下來看下這個方法是幹嘛的。
private ExecutorService executorService;
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
可以看出來這個方法就是以懶漢的方式建立最大容量為 Integer.MAX_VALUE
, 存活等待時間為60S的執行緒池(其實這裡的最大容量並沒什麼用,因為他的最大容量不會超過runningAsyncCalls的size,即設定的併發請求數的閾值)。executorService().execute(call)
就是把這個請求丟進去執行。那麼enqueue()
方法執行的過程大概就是,首先判斷當前正在執行的非同步請求總數是否已經達到的閾值(預設為64),針對每個host的同時請求數量是否達到了閾值(預設為5)。如果都沒有達到那麼將這個請求加入到runningAsyncCalls佇列中,馬上執行。
否則,會將這個請求加入到readyAsyncCalls中,準備執行。那麼readyAsyncCalls中的請求時何時被呼叫的呢?掐指一算,應該是在runningAsyncCalls中某些請求被執行完畢時,不滿足上面的兩個條件自然會被呼叫。是不是呢?接下來看上面一直忽略的Dispatcher的**三個**finished方法:
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
//非同步請求結束時呼叫此方法
finished(runningAsyncCalls, call, true);
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
//同步請求結束時呼叫此方法
finished(runningSyncCalls, call, false);
}
/**
*將執行完畢的call從相應的佇列移除
*/
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//從相應的佇列中移除相應的call,如果不包含,拋異常
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//是否需要提升Call的級別
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
//如果沒有任何需要執行的請求,那麼執行idleCallBack
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
可以看出來不管是非同步呼叫結束,還是同步呼叫結束,最終都是呼叫的這個被private修飾的finished方法,都會將完成的call從相應的佇列中移除。唯一不同的是呼叫時傳遞的promoteCalls引數不同,非同步請求結束時傳入的是true,同步請求時結束傳入的是false。並且會根據這個flag來判斷是否執行promoteCalls()
方法,接下來看promoteCalls()
裡做了什麼。
/**
*提升call的優先順序
*/
private void promoteCalls() {
//runningAsyncCalls已經滿了,不能再加了
if (runningAsyncCalls.size() >= maxRequests) return;
//沒有請求在readyAsyncCalls等著被執行
if (readyAsyncCalls.isEmpty()) return;
//遍歷準備佇列裡的請求
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
//判斷該請求的host是否小於每個host最大請求閾值
if (runningCallsForHost(call) < maxRequestsPerHost) {
//將該請求從readyAsyncCalls移除,加入runningAsyncCalls並執行
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
//如果runningAsyncCalls數量已經達到閾值,終止遍歷
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
可以看出promoteCalls()
方法就是試圖去readyAsyncCalls中取出Call來加入runningAsyncCalls中執行。所以上面的兩個finished方法呼叫方式的區別也就明晰了。同步呼叫結束因為並沒有涉及到runningAsyncCalls中的任何東西,對runningAsyncCalls沒任何影響,所以不需要呼叫promoteCalls。而非同步的呼叫結束意味著runningAsyncCalls中會出現一個空位值,所以它會呼叫promoteCalls去嘗試從readyAsyncCalls中拉一個進來。
總結
好啦 到這裡整個dispatcher的排程分析算是完成了。總結起來其實他就是維護了三個佇列,三個佇列中包含了正在執行或者將要執行的所有請求。總結起來就是:
- 當傳送一個非同步請求時:如果runningAsyncCalls沒達到閾值,那麼會將這個請求加入到runningAsyncCalls立即執行,否則會將這個請求加入到readyAsyncCalls中等待執行。當一個非同步請求執行完畢時會試圖去執行readyAsyncCalls中的請求。
- 當傳送一個同步請求時:該請求會直接加入到runningSyncCalls中,並且馬上開始執行,注意這個執行並不是由Dispatcher排程的。
- 所有非同步執行的請求都會通過executorService執行緒池來執行,這是個懶漢方式建立的執行緒池。