1. 程式人生 > >ThreadPoolExecutor詳解

ThreadPoolExecutor詳解

線程狀態 alt rexec 重置 log 對象傳遞 volatile 循環 實現原理

ThreadPoolExecutor顧名思義,是一個線程池管理工具類,該類主要提供了任務管理,線程的調度和相關的hook方法來控制線程池的狀態。

1.方法說明

任務管理主要方法如下:

public void execute(Runnable command);
public <T> Future<T> submit(Callable<T> task);
public <T> Future<T> submit(Runnable task, T result);
public Future<?> submit(Runnable task);
public
void shutdown(); public List<Runnable> shutdownNow();

上述方法中,execute()和submit()方法在有空閑線程存在的情況下會立即調用該線程執行任務,區別在於execute()方法是忽略任務執行結果的,而submit()方法則可以獲取結果。除此之外,ThreadPoolExecutor還提供了shutdown()和shutdownNow()方法用於關閉線程池,區別在於shutdown()方法在調用之後會將任務隊列中的任務都執行完畢之後再關閉線程池,而shutdownNow()方法則會直接關閉線程池,並且將任務隊列中的任務導出到一個列表中返回。

除上述用於執行任務的方法外,ThreadPoolExecutor還提供了如下幾個hook(鉤子)方法:

protected void beforeExecute(Thread t, Runnable r);
protected void afterExecute(Runnable r, Throwable t);
protected void terminated();

在ThreadPoolExecutor中這幾個方法默認都是空方法,beforeExecute()會在每次任務執行之前調用,afterExecute()會在每次任務結束之後調用,terminated()方法則會在線程池被終止時調用。使用這幾個方法的方式就是聲明一個子類繼承ThreadPoolExecutor,並且在子類中重寫需要定制的鉤子方法,最後在創建線程池時使用該子類實例即可。

2.任務調度

a.相關參數

對於ThreadPoolExecutor的實例化,其主要有如下幾個重要的參數:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
                          TimeUnit unit, BlockingQueue<Runnable> workQueue, 
                          ThreadFactory threadFactory, RejectedExecutionHandler handler);
  • corePoolSize: 線程池核心線程的數量;
  • maximumPoolSize: 線程池可創建的最大線程數量;
  • keepAliveTime: 當線程數量超過了corePoolSize指定的線程數,並且空閑線程空閑的時間達到當前參數指定的時間時該線程就會被銷毀,如果調用過allowCoreThreadTimeOut(boolean value)方法允許核心線程過期,那麽該策略針對核心線程也是生效的;
  • unit: 指定了keepAliveTime的單位,可以為毫秒,秒,分,小時等;
  • workQueue: 存儲未執行的任務的隊列;
  • threadFactory: 創建線程的工廠,如果未指定則使用默認的線程工廠;
  • handler: 指定了當任務隊列已滿,並且沒有可用線程執行任務時對新添加的任務的處理策略;
b.調度策略

當初始化一個線程池之後,池中是沒有任何用戶執行任務的活躍線程的,當新的任務到來時,根據配置的參數其主要的執行任務如下:

  • 若線程池中線程數小於corePoolSize指定的線程數時,每來一個任務,都會創建一個新的線程執行該任務,無論線程池中是否已有空閑的線程;
  • 若當前執行的任務達到了corePoolSize指定的線程數時,也即所有的核心線程都在執行任務時,此時來的新任務會保存在workQueue指定的任務隊列中;
  • 當所有的核心線程都在執行任務,並且任務隊列中存滿了任務,此時若新來了任務,那麽線程池將會創建新線程執行任務;
  • 若所有的線程(maximumPoolSize指定的線程數)都在執行任務,並且任務隊列也存滿了任務時,對於新添加的任務,其都會使用handler所指定的方式對其進行處理。
c.調度策略註意點
  • 在第二步中,當前核心線程都在執行任務,並且任務隊列已滿時,會創建新的線程執行任務,這裏需要註意的是,創建新線程的時候當前總共需要執行的任務數是(corePoolSize + workQueueSize),並不是只有corePoolSize個任務;
  • 在第三步中,這裏workQueue主要有三種類型:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue,第一個是有界阻塞隊列,第二個是無界阻塞隊列,當然也可以為其指定界限大小,第三個是同步隊列,對於ArrayBlockingQueue,其是需要指定隊列大小的,當隊列存滿了任務線程池就會創建新的線程執行任務,對於LinkedBlockingQueue,如果其指定界限,那麽和ArrayBlockingQueue區別不大,如果其不指定界限,那麽其理論上是可以存儲無限量的任務的,實際上能夠存儲Integer.MAX_VALUE個任務(還是相當於可以存儲無限量的任務),此時由於LinkedBlockingQueue是永遠無法存滿任務的,因而maxPoolSize的設定將沒有意義,一般其會設定為和corePoolSize相同的值,對於SynchronousQueue,其內部是沒有任何結構存儲任務的,當一個任務添加到該隊列時,當前線程和後續添加任務的線程都會被阻塞,直至有一個線程從該隊列中取出任務,當前線程才會被釋放,因而如果線程池使用了該隊列,那麽一般corePoolSize都會設計得比較小,maxPoolSize會設計得比較大,因為該隊列比較適合大量並且執行時間較短的任務的執行;
  • 在第四步中,DiscardPolicy和DiscardOldestPolicy一般不會配合SynchronousQueue使用,因為當同步隊列阻塞了任務時,該任務都會被拋棄;對於AbortPolicy,因為如果隊列已滿,那麽其會拋出異常,因而使用時需要小心;對於CallerRunsPolicy,由於當有新的任務到達時會使用調用線程執行當前任務,因而使用時需要考慮其對服務器響應的影響,並且還需要註意的是,相對於其他幾個策略,該策略不會拋棄任務到達的任務,因為如果到達的任務使隊列滿了而只能使用調用線程執行任務時,說明線程池設計得不夠合理,如果任其發展,那麽所有的調用線程都可能會被需要執行的任務所阻塞,導致服務器出現問題。

3.源碼講解

a.主要屬性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 32
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 00011111 11111111 11111111 11111111

private static final int RUNNING    = -1 << COUNT_BITS; // 11100000 00000000 00000000 00000000
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 00000000 00000000 00000000 00000000
private static final int STOP       =  1 << COUNT_BITS; // 00100000 00000000 00000000 00000000
private static final int TIDYING    =  2 << COUNT_BITS; // 01000000 00000000 00000000 00000000
private static final int TERMINATED =  3 << COUNT_BITS; // 01100000 00000000 00000000 00000000

由於ThreadPoolExecutor需要管理多種狀態,並且還要記錄當前執行任務的線程的數量,如果使用多個變量,並發更新時管理將會非常復雜,這裏ThreadPoolExecutor則主要使用一個AtomicInteger類型的變量ctl存儲所有主要的信息。ctl是一個32位的整形數字,初始值為0,其最高的三位用於存儲當前線程池的狀態信息,主要有RUNNING,SHUTDOWN,STOP,TIDING和TERMINATED,分別表示運行狀態,關閉狀態,終止狀態,整理狀態和結束狀態。這幾種狀態對應的具體數值信息如上述代碼所示,這裏需要註意的一點是,在ThreadPoolExecutor中,這幾種狀態在數值上是從小到大依次增大的,並且狀態流轉也是依次往下的,這就為其判斷狀態信息提供了比較便利的方式,如當需要判斷線程池狀態是否處於SHUTDOWN狀態時,只需要判斷其代表狀態位部分的值是否等於SHUTDOWN即可。在ctl中,除了最高三位用於表示狀態外,其余位所代表的數值則指定了當前線程池中正在執行任務的線程數。如下是操作ctl屬性的相關方法:

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

private static boolean runStateLessThan(int c, int s) {
  return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
  return c >= s;
}

private static boolean isRunning(int c) {
  return c < SHUTDOWN;
}

private boolean compareAndIncrementWorkerCount(int expect) {
  return ctl.compareAndSet(expect, expect + 1);
}

private boolean compareAndDecrementWorkerCount(int expect) {
  return ctl.compareAndSet(expect, expect - 1);
}
  • runStateOf(int c): 用於獲取當前線程池的狀態,c為當前線程池工作時的ctl屬性值;
  • workerCountOf(int c): 用於獲取當前線程池正在工作的線程數量,c為當前線程池工作時的ctl屬性值;
  • ctlOf(int rs, int wc): 這裏rs表示當前線程的工作狀態,wc則表示正在工作的線程數,該方法用於將這兩個參數組裝為一個ctl屬性值;
  • runStateLessThan(int c, int s): 判斷當前線程池狀態是否未達到指定狀態,如前所述,狀態流轉在數值上是依次增大的,因而這裏只需要判斷其大小即可;
  • runStateAtLeast(int c, int s): 用於判斷當前線程池狀態是否至少處於某種狀態;
  • isRunning(int c): 用於判斷當前線程池是否處於正常運行狀態;
  • compareAndIncrementWorkerCount(int expect): 增加當前線程池的工作線程數量值;
  • compareAndDecrementWorkerCount(int expect): 減少當前線程池的工作線程數量值。
b.主要方法

對於線程池的execute()和submit()方法,其實在底層submit()方法會將傳入的任務封裝為一個FutureTask對象,由於FutureTask對象是實現了Runnable接口的,因而其也可以當做一個任務執行,這裏就是將封裝後的FutureTask對象傳遞給execute()方法執行的。我們這裏則主要講解execute()方法的實現方式,如下是execute()方法的代碼:

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();

  int c = ctl.get();    // 獲取當前線程池狀態
  if (workerCountOf(c) < corePoolSize) {
    // 當工作線程數小於核心線程數時,則調用addWorker()方法創建線程並執行任務
    if (addWorker(command, true))
      return;
    c = ctl.get();  // 若添加失敗,則更新當前線程池狀態
  }
  
  // 執行到此處,則說明線程池中的工作線程要麽大於等於核心線程數,要麽當前線程池已經被命令關閉了(addWorker方法添加失敗的原因),因而這裏判斷線程池是否為RUNNING狀態,是則將任務添加到任務隊列中
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    // 添加隊列成功後雙重驗證,確保線程池處於正確狀態
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);   // 若線程池中沒有線程,則創建一個新線程執行添加的任務
  } else if (!addWorker(command, false))
    reject(command);    // 線程池至少處於SHUTDOWN狀態,拒絕當前任務的執行
}

在execute()方法中,其首先判斷線程池工作線程數是否小於核心線程數,是則創建核心線程執行任務,添加失敗或者工作線程數大於等於核心線程數時,則將任務添加到任務隊列中,添加成功後會進行雙重驗證確保當前線程池處於正確的狀態,並且確保當前有可用的線程執行新添加的任務。由此可見對於execute()方法的實現,其比較核心的方法是addWorker()方法,如下是addWorker()方法的實現方式:

private boolean addWorker(Runnable firstTask, boolean core) {
  retry:
  for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c); // 獲取當前運行狀態

    // 判斷當前線程池是否至少為SHUTDOWN狀態,並且firstTask和任務隊列中沒有任務,是則直接返回
    if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
      return false;

    for (;;) {
      int wc = workerCountOf(c);
      // 判斷是否工作線程數大於可記錄的最大線程數,或者工作線程超過了指定的核心線程或者最大線程數
      if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
      // 走到這一步說明當前線程池處於RUNNING狀態,或者任務隊列存在任務,並且工作線程數不超過
      // 指定的線程數量,那麽就增加工作線程數量,成功則繼續往下執行,失敗則重復上述添加步驟
      if (compareAndIncrementWorkerCount(c))
        break retry;
      c = ctl.get();
      if (runStateOf(c) != rs)
        continue retry;
    }
  }

  // 記錄工作線程數的變量已經更新,接下來創建線程執行任務
  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    w = new Worker(firstTask);  // 創建一個工作者對象
    final Thread t = w.thread;
    if (t != null) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        int rs = runStateOf(ctl.get());

        // 重新檢查線程池狀態,或者是判斷當前是SHUTDOWN狀態,而firstTask為空,這說明任務隊列此時不為空
        if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
          if (t.isAlive())
            throw new IllegalThreadStateException();
          workers.add(w);   // 將創建的工作者添加到工作者集合中
          int s = workers.size();
          if (s > largestPoolSize)
            largestPoolSize = s;    // 更新已使用的最大線程數
          workerAdded = true;
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        t.start();  // 工作者對象成功創建之後,調用該工作者執行任務
        workerStarted = true;
      }
    }
  } finally {
    if (!workerStarted)
      addWorkerFailed(w);
  }
  return workerStarted;
}

在addWorker()方法中,其首先檢查當前線程池是否處於RUNNING狀態,或者處於SHUTDOWN狀態,但是任務隊列中還存在有任務,那麽其就會創建一個新的Worker對象,並且將其添加到工作者對象集合中,然後調用工作者對象所維護的線程執行任務,如下是工作者對象的實現代碼:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  private static final long serialVersionUID = 6138294804551838833L;
  final Thread thread;  // 當前工作者中執行任務的線程
  Runnable firstTask;   // 第一個需要執行的任務
  volatile long completedTasks; // 當前工作者完成的任務數

  Worker(Runnable firstTask) {
    // 默認設置為-1,那麽如果不調用當前工作者的run()方法,那麽其狀態是不會改變的,
    // 其他的線程也無法使用當前工作者執行任務,在run()方法調用的runWorker()方法中會
    // 調用unlock()方法使當前工作者處於正常狀態
    setState(-1);
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);   // 使用線程工廠創建線程
  }

  public void run() {
    runWorker(this);    // 使用當前工作者執行任務
  }

  protected boolean isHeldExclusively() {
    return getState() != 0;
  }

  protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
      setExclusiveOwnerThread(Thread.currentThread());
      return true;
    }
    return false;
  }

  protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
  }

  public void lock()        { acquire(1); }
  public boolean tryLock()  { return tryAcquire(1); }
  public void unlock()      { release(1); }
  public boolean isLocked() { return isHeldExclusively(); }

  // 如果當前線程已經在執行任務,那麽將其標記為打斷狀態,待其任務執行完畢則終止任務的執行
  void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
        t.interrupt();
      } catch (SecurityException ignore) {
      }
    }
  }
}

在工作者對象中,其主要維護了一個工作者線程,用於執行任務。該工作者對象繼承了AbstractQueuedSynchronizer,用於控制當前工作者工作狀態的獲取,並且其也實現了Runnable接口,將主要任務的執行封裝到run()方法中。如下是runWorker()方法的具體實現:

final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock();   // 重置Worker對象的狀態
  boolean completedAbruptly = true;
  try {
    // 首先執行工作者線程中的任務,然後循環從任務隊列中獲取任務執行
    while (task != null || (task = getTask()) != null) {
      w.lock();
      // 檢查當前線程池的狀態,如果線程池被終止或者線程池終止並且當前線程已被打斷
      if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
        wt.interrupt();
      try {
        beforeExecute(wt, task);    // 調用鉤子方法進行預處理
        Throwable thrown = null;
        try {
          task.run();   // 執行任務
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          afterExecute(task, thrown);   // 調用鉤子方法進行任務完成後的處理工作
        }
      } finally {
        task = null;    // 重置工作者的初始任務
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    processWorkerExit(w, completedAbruptly);
  }
}

可以看到,在runWorker()方法中,其首先會執行工作者對象的初始化任務,當執行完畢後會通過一個無限循環不斷在任務隊列中獲取任務執行。如下是getTask()方法的源碼:

private Runnable getTask() {
  boolean timedOut = false;

  for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // 判斷當前線程是否處於STOP狀態,或者處於SHUTDOWN狀態,並且工作隊列是空的,是則不返回任務
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      decrementWorkerCount();
      return null;
    }

    int wc = workerCountOf(c);
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;    // 是否允許空閑線程過期

    // 工作線程數大於最大允許線程數,或者線程在指定時間內無法從工作隊列中獲取到新任務,則銷毀當前線程
    if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
      if (compareAndDecrementWorkerCount(c))
        return null;
      continue;
    }

    try {
      // 允許核心線程過期或者工作線程數大於corePoolSize時,從任務隊列獲取任務時會指定等待時間,
      // 否則會一直等待任務隊列中新的任務
      Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
      if (r != null)
        return r;
      timedOut = true;
    } catch (InterruptedException retry) {
      timedOut = false;
    }
  }
}

可以看到,getTask方法首先會判斷當前線程池狀態是否為STOP狀態,或者是SHUTDOWN狀態,並且任務隊列是空的,是則不返回任務,否則會根據相關參數從任務隊列中獲取任務執行。

以上execute()方法的主要實現步驟,在ThreadPoolExecutor中另一個至關重要的方法則是shutdown()方法,以下是shutdown()方法的主要代碼:

public void shutdown() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    checkShutdownAccess();  // 檢查對線程狀態的控制權限
    advanceRunState(SHUTDOWN);  // 更新當前線程池狀態為SHUTDOWN
    interruptIdleWorkers(); // 打斷空閑的工作者
    onShutdown();   // 鉤子方法,但是沒有對外公開,因為該方法只有包訪問權限
  } finally {
    mainLock.unlock();
  }
  tryTerminate();   
}

在shutdown()方法中,其首先檢查當前線程是否有修改線程狀態的權限,然後將當前線程池的狀態修改為SHUTDOWN,接著調用interruptIdleWorkers()方法中斷所有處於空閑狀態的線程,最後則是調用tryTerminate()方法嘗試將當前線程池的狀態由SHUTDOWN修改為TERMINATED,這裏interruptIdleWorkers()方法最終會調用其重載方法interruptIdleWorkers(boolean)方法,該方法代碼如下:

private void interruptIdleWorkers(boolean onlyOne) {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    for (Worker w : workers) {
      Thread t = w.thread;
      if (!t.isInterrupted() && w.tryLock()) {
        try {
          t.interrupt();
        } catch (SecurityException ignore) {
        } finally {
          w.unlock();
        }
      }
      if (onlyOne)
        break;
    }
  } finally {
    mainLock.unlock();
  }
}

可以看到,該方法會遍歷所有的工作者對象,如果其處於空閑狀態,則將其終止。對於處於工作狀態的線程,由於在shutdown()方法中已經將當前線程池的狀態設置為SHUTDOWN,那麽工作狀態的線程會將任務隊列中的任務都執行完畢之後自動銷毀。

本文主要講解了ThreadPoolExecutor的主要方法,線程池的調度方式,以及其核心功能的實現原理,如本文有任何不當之處,敬請指正,謝謝!

ThreadPoolExecutor詳解