1. 程式人生 > >OkHttp Dispatcher的排程過程分析

OkHttp Dispatcher的排程過程分析

Dispatcher是負責對okhttp所有的請求進行排程管理的類。可以通過Dispatcher獲取,或者取消所有請求。這裡指的一個請求就是對應的Call並不是Request,下面出現的所有的請求都是指Call。這裡通過分析跟蹤okhttp傳送請求的過程來分析Dispatcher是如何維護和排程我們發出的所有請求的。

Call其實就是對Request的封裝。

OkHttp請求方式

通過okhttp傳送請求主要有兩種方式。

  1. 通過execute()呼叫,此時request會被馬上發出, 直到返回response或者發生錯誤前會一直阻塞。可以理解為一個立即執行的同步請求。
  2. 通過enqueue()
    呼叫,此時request將會在未來的某個時間點被執行,具體由dispatcher進行排程,這種方式是非同步返回結果的。可以理解為會被儘快執行的一個非同步請求。

第一種方式

通過execute()呼叫,一般是這樣的

okHttpClient.newCall(request).execute();

通過OkHttpClient的程式碼可以看出newCall()方法其實是new了一個RealCall,所以這裡直接檢視RealCallexecute()方法。

@Override public Call newCall(Request request) {
  return new
RealCall(this, request, false /* for web socket */); }

RealCallexecute()方法,這裡只看下核心的程式碼:

@Override public Response execute() throws IOException {
  synchronized (this) {
  //此處除去一些其他程式碼
  //...
  try {
    //通知Dispatcher這個Call正在被執行,同時將此Call交給Dispatcher
    //Dispatcher可以對此Call進行管理
    client.dispatcher().executed(this
); //請求的過程,注意這行程式碼是阻塞的,直到返回result! Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } finally { //此時這個請求已經執行完畢了,通知Dispatcher,不要再維護這個Call了 client.dispatcher().finished(this); } }

首先注意這行程式碼

client.dispatcher().executed(this);

它是呼叫的Dispatcher的executed()方法,注意看方法名是executed並不是execute。接下來去Dispatcher裡看下這個方法做了什麼。

/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
  runningSyncCalls.add(call);
}

看註釋就明白了,這裡他只是一個通知的作用,通知Dispatcher我這個call立即要被或者正在被執行,然後Dispatcher會把加入一個名為runningSyncCalls的雙端佇列中,這個佇列中儲存著所有的正在執行的同步請求。這樣Dispatcher就可以很方便的對所有的同步請求進行管理了。既然有新增,那麼也應該有刪除,在請求執行完畢時呼叫了這行程式碼:

client.dispatcher().finished(this);

通過字面意思理解他應該就是刪除的操作,通知Dispatcher這個請求已經被執行完畢了。這裡暫時理解為呼叫finished方法就是將此call從runningSyncCalls中移除,後面會再討論finished方法的細節。

因為同步請求是被馬上執行的,所以Dispatcher能對同步請求進行的排程也只有cancel了。具體可以通過呼叫Dispatcher.cancelAll()方法進行取消。

所以真正執行請求的只有這行程式碼了。

Response result = getResponseWithInterceptorChain();

這個方法先不管他,就可以理解為這行程式碼的執行就是請求從發出到完成的過程。在分析攔截器的實現原理的時候再來討論。

第二種方式

client.newCall(request).enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {

    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {

    }
});

通過上面我們已經知道這裡呼叫的也是RealCall的enqueue方法,我們直接來看程式碼:

@Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
  //判斷是否已經執行過了
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  //捕獲呼叫棧的資訊,用來分析連線洩露
  captureCallStackTrace();
  //封裝一個AsyncCall交給Dispatcher排程
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

通過上面的程式碼可以看出呼叫enqueue()方法,其實是呼叫了Dispatcher的enqueue()方法,並且new了一個AsyncCall作為引數。AsyncCall為RealCall的一個內部類,下面繼續看AsyncCall類裡到底做了什麼。

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }
    //...
    /**
    *真正執行發出請求的地方,為了看起來清晰,精簡了部分程式碼
    */
    @Override protected void execute() {
      try {
        //請求的過程,注意這裡也是阻塞的
        Response response = getResponseWithInterceptorChain();
        //先不管這個Interceptor是幹嘛的,下面的程式碼可以理解為:
        //如果沒有被取消,並且沒有發生異常,回撥onResponse方法。
        //如果發生了異常或者被取消,回撥onFailure方法。
        if (retryAndFollowUpInterceptor.isCanceled()) {
          //此請求被取消了,回撥onFailure
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          //此請求成功了,回撥onResponse
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
          //發生了異常,回撥onFailure
          responseCallback.onFailure(RealCall.this, e);
      } finally {
        //通知Dispatcher Call被執行完畢了
        client.dispatcher().finished(this);
      }
    }
  }

可以看到AsyncCall的execute()就是具體請求執行的地方,只不過和上面的RealCall的execute()方法相比,多了回撥的處理。retryAndFollowUpInterceptor其實是負責請求超時的重試和重定向操作的,retryAndFollowUpInterceptor.isCanceled()就是用來判斷這個請求是否被取消了,這裡就不深入展開了。那麼AsyncCall的execute()方法是怎麼被執行的呢,繼續來看AsyncCall的父類NamedRunnable。

/**
 * Runnable implementation which always sets its thread name.
 */
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      //注意這裡呼叫了execute方法
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

可以看到NamedRunnable其實就是一個實現了Runnable介面的抽象類,並且在run方法中呼叫了execute()。也就是說AsyncCal其實就是一個Runnable,當這個Runnable被呼叫的時候execute()方法自然會被呼叫。看到這裡就很清晰了,再回過頭來看RealCall的enqueue()中呼叫的這段程式碼

//封裝一個AsyncCall交給Dispatcher排程
client.dispatcher().enqueue(new AsyncCall(responseCallback));

其實這裡的new AsyncCall(responseCallback)就是new了一個封裝的Runnable物件,這個Runnable的執行,就是整個請求的發起與回撥的過程。好啦,這裡搞明白了其實呼叫Dispatcher().enqueue()方法傳遞過去的是一個Runnable物件,接下來就去Dispatcher中看下,他對這個Runnable做了什麼。

synchronized void enqueue(AsyncCall call) {
  //判斷正在執行的非同步請求數沒有達到閾值,並且每一個Host的請求數也沒有達到閾值
  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    //加入到正在執行佇列,並立即執行
    runningAsyncCalls.add(call);
    executorService().execute(call);
  } else {
    //加入到等待佇列
    readyAsyncCalls.add(call);
  }
}

上面的程式碼中又出現了兩個雙端佇列,runningAsyncCalls和readyAsyncCalls,加上上面出現的runningSyncCalls可以看到Dispatcher一共維護了3個請求佇列,分別是

  1. runningAsyncCalls,正在請求的非同步佇列
  2. readyAsyncCalls,準備請求的非同步佇列\等待請求的非同步佇列
  3. runningSyncCalls,正在請求的同步佇列

還出現了一個方法executorService(),接下來看下這個方法是幹嘛的。

private ExecutorService executorService;
public synchronized ExecutorService executorService() {
  if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}

可以看出來這個方法就是以懶漢的方式建立最大容量為 Integer.MAX_VALUE, 存活等待時間為60S的執行緒池(其實這裡的最大容量並沒什麼用,因為他的最大容量不會超過runningAsyncCalls的size,即設定的併發請求數的閾值)。executorService().execute(call)就是把這個請求丟進去執行。那麼enqueue()方法執行的過程大概就是,首先判斷當前正在執行的非同步請求總數是否已經達到的閾值(預設為64),針對每個host的同時請求數量是否達到了閾值(預設為5)。如果都沒有達到那麼將這個請求加入到runningAsyncCalls佇列中,馬上執行。

否則,會將這個請求加入到readyAsyncCalls中,準備執行。那麼readyAsyncCalls中的請求時何時被呼叫的呢?掐指一算,應該是在runningAsyncCalls中某些請求被執行完畢時,不滿足上面的兩個條件自然會被呼叫。是不是呢?接下來看上面一直忽略的Dispatcher的**三個**finished方法:

/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
  //非同步請求結束時呼叫此方法
  finished(runningAsyncCalls, call, true);
}

/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
  //同步請求結束時呼叫此方法
  finished(runningSyncCalls, call, false);
}
/**
*將執行完畢的call從相應的佇列移除
*/
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
  int runningCallsCount;
  Runnable idleCallback;
  synchronized (this) {
    //從相應的佇列中移除相應的call,如果不包含,拋異常
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    //是否需要提升Call的級別
    if (promoteCalls) promoteCalls();
    runningCallsCount = runningCallsCount();
    idleCallback = this.idleCallback;
  }
    //如果沒有任何需要執行的請求,那麼執行idleCallBack
  if (runningCallsCount == 0 && idleCallback != null) {
    idleCallback.run();
  }
}

可以看出來不管是非同步呼叫結束,還是同步呼叫結束,最終都是呼叫的這個被private修飾的finished方法,都會將完成的call從相應的佇列中移除。唯一不同的是呼叫時傳遞的promoteCalls引數不同,非同步請求結束時傳入的是true,同步請求時結束傳入的是false。並且會根據這個flag來判斷是否執行promoteCalls()方法,接下來看promoteCalls()裡做了什麼。

/**
*提升call的優先順序
*/
private void promoteCalls() {
  //runningAsyncCalls已經滿了,不能再加了
  if (runningAsyncCalls.size() >= maxRequests) return; 
  //沒有請求在readyAsyncCalls等著被執行
  if (readyAsyncCalls.isEmpty()) return; 
  //遍歷準備佇列裡的請求
  for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
    AsyncCall call = i.next();
    //判斷該請求的host是否小於每個host最大請求閾值
    if (runningCallsForHost(call) < maxRequestsPerHost) {
      //將該請求從readyAsyncCalls移除,加入runningAsyncCalls並執行
      i.remove();
      runningAsyncCalls.add(call);
      executorService().execute(call);
    }
    //如果runningAsyncCalls數量已經達到閾值,終止遍歷
    if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
  }
}

可以看出promoteCalls()方法就是試圖去readyAsyncCalls中取出Call來加入runningAsyncCalls中執行。所以上面的兩個finished方法呼叫方式的區別也就明晰了。同步呼叫結束因為並沒有涉及到runningAsyncCalls中的任何東西,對runningAsyncCalls沒任何影響,所以不需要呼叫promoteCalls。而非同步的呼叫結束意味著runningAsyncCalls中會出現一個空位值,所以它會呼叫promoteCalls去嘗試從readyAsyncCalls中拉一個進來。

總結

好啦 到這裡整個dispatcher的排程分析算是完成了。總結起來其實他就是維護了三個佇列,三個佇列中包含了正在執行或者將要執行的所有請求。總結起來就是:

  1. 當傳送一個非同步請求時:如果runningAsyncCalls沒達到閾值,那麼會將這個請求加入到runningAsyncCalls立即執行,否則會將這個請求加入到readyAsyncCalls中等待執行。當一個非同步請求執行完畢時會試圖去執行readyAsyncCalls中的請求。
  2. 當傳送一個同步請求時:該請求會直接加入到runningSyncCalls中,並且馬上開始執行,注意這個執行並不是由Dispatcher排程的。
  3. 所有非同步執行的請求都會通過executorService執行緒池來執行,這是個懶漢方式建立的執行緒池。
整個大致的流程