1. 程式人生 > 實用技巧 >深度解讀 java 執行緒池設計思想及原始碼實現

深度解讀 java 執行緒池設計思想及原始碼實現

轉自

https://javadoop.com/2017/09/05/java-thread-pool/hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io

我相信大家都看過很多的關於執行緒池的文章,基本上也是面試必問的,好像我寫這篇文章其實是沒有什麼意義的,不過,我相信你也和我一樣,看了很多文章還是一知半解,甚至可能看了很多瞎說的文章。希望大家看過這篇文章以後,就可以完全掌握 Java 執行緒池了。

我發現好些人都是因為這篇文章來到本站的,希望這篇讓人留下第一眼印象的文章能給你帶來收穫。

本文一大重點是原始碼解析,不過執行緒池設計思想以及作者實現過程中的一些巧妙用法是我想傳達給讀者的。本文還是會一行行關鍵程式碼進行分析,目的是為了讓那些自己看原始碼不是很理解的同學可以得到參考。

執行緒池是非常重要的工具,如果你要成為一個好的工程師,還是得比較好地掌握這個知識。即使你為了謀生,也要知道,這基本上是面試必問的題目,而且面試官很容易從被面試者的回答中捕捉到被面試者的技術水平。

本文略長,建議在 pc 上閱讀,邊看文章邊翻原始碼(Java7 和 Java8 都一樣),建議想好好看的讀者抽出至少 15 至 30 分鐘的整塊時間來閱讀。當然,如果讀者僅為面試準備,可以直接滑到最後的總結部分。

目錄

總覽

開篇來一些廢話。下圖是 java 執行緒池幾個相關類的繼承結構:

先簡單說說這個繼承結構,Executor 位於最頂層,也是最簡單的,就一個 execute(Runnable runnable) 介面方法定義。

ExecutorService 也是介面,在 Executor 介面的基礎上添加了很多的介面方法,所以一般來說我們會使用這個介面。

然後再下來一層是 AbstractExecutorService,從名字我們就知道,這是抽象類,這裡實現了非常有用的一些方法供子類直接使用,之後我們再細說。

然後才到我們的重點部分 ThreadPoolExecutor 類,這個類提供了關於執行緒池所需的非常豐富的功能。

另外,我們還涉及到下圖中的這些類:

同在併發包中的 Executors 類,類名中帶字母 s,我們猜到這個是工具類,裡面的方法都是靜態方法,如以下我們最常用的用於生成 ThreadPoolExecutor 的例項的一些方法:

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }
  6. public static ExecutorService newFixedThreadPool(int nThreads) {
  7. return new ThreadPoolExecutor(nThreads, nThreads,
  8. 0L, TimeUnit.MILLISECONDS,
  9. new LinkedBlockingQueue<Runnable>());
  10. }

另外,由於執行緒池支援獲取執行緒執行的結果,所以,引入了 Future 介面,RunnableFuture 繼承自此介面,然後我們最需要關心的就是它的實現類 FutureTask。到這裡,記住這個概念,線上程池的使用過程中,我們是往執行緒池提交任務(task),使用過執行緒池的都知道,我們提交的每個任務是實現了 Runnable 介面的,其實就是先將 Runnable 的任務包裝成 FutureTask,然後再提交到執行緒池。這樣,讀者才能比較容易記住 FutureTask 這個類名:它首先是一個任務(Task),然後具有 Future 介面的語義,即可以在將來(Future)得到執行的結果。

當然,執行緒池中的 BlockingQueue 也是非常重要的概念,如果執行緒數達到 corePoolSize,我們的每個任務會提交到等待佇列中,等待執行緒池中的執行緒來取任務並執行。這裡的 BlockingQueue 通常我們使用其實現類 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue,每個實現類都有不同的特徵,使用場景之後會慢慢分析。想要詳細瞭解各個 BlockingQueue 的讀者,可以參考我的前面的一篇對 BlockingQueue 的各個實現類進行詳細分析的文章。

把事情說完整:除了上面說的這些類外,還有一個很重要的類,就是定時任務實現類 ScheduledThreadPoolExecutor,它繼承自本文要重點講解的 ThreadPoolExecutor,用於實現定時執行。不過本文不會介紹它的實現,我相信讀者看完本文後可以比較容易地看懂它的原始碼。

以上就是本文要介紹的知識,廢話不多說,開始進入正文。

Executor 介面

  1. /*
  2. * @since 1.5
  3. * @author Doug Lea
  4. */
  5. public interface Executor {
  6. void execute(Runnable command);
  7. }

我們可以看到 Executor 介面非常簡單,就一個void execute(Runnable command)方法,代表提交一個任務。為了讓大家理解 java 執行緒池的整個設計方案,我會按照 Doug Lea 的設計思路來多說一些相關的東西。

我們經常這樣啟動一個執行緒:

  1. new Thread(new Runnable(){
  2. // do something
  3. }).start();

用了執行緒池 Executor 後就可以像下面這麼使用:

  1. Executor executor = anExecutor;
  2. executor.execute(new RunnableTask1());
  3. executor.execute(new RunnableTask2());

如果我們希望執行緒池同步執行每一個任務,我們可以這麼實現這個介面:

  1. class DirectExecutor implements Executor {
  2. public void execute(Runnable r) {
  3. r.run();// 這裡不是用的new Thread(r).start(),也就是說沒有啟動任何一個新的執行緒。
  4. }
  5. }

我們希望每個任務提交進來後,直接啟動一個新的執行緒來執行這個任務,我們可以這麼實現:

  1. class ThreadPerTaskExecutor implements Executor {
  2. public void execute(Runnable r) {
  3. new Thread(r).start(); // 每個任務都用一個新的執行緒來執行
  4. }
  5. }

我們再來看下怎麼組合兩個 Executor 來使用,下面這個實現是將所有的任務都加到一個 queue 中,然後從 queue 中取任務,交給真正的執行器執行,這裡採用 synchronized 進行併發控制:

  1. class SerialExecutor implements Executor {
  2. // 任務佇列
  3. final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
  4. // 這個才是真正的執行器
  5. final Executor executor;
  6. // 當前正在執行的任務
  7. Runnable active;
  8. // 初始化的時候,指定執行器
  9. SerialExecutor(Executor executor) {
  10. this.executor = executor;
  11. }
  12. // 新增任務到執行緒池: 將任務新增到任務佇列,scheduleNext 觸發執行器去任務佇列取任務
  13. public synchronized void execute(final Runnable r) {
  14. tasks.offer(new Runnable() {
  15. public void run() {
  16. try {
  17. r.run();
  18. } finally {
  19. scheduleNext();
  20. }
  21. }
  22. });
  23. if (active == null) {
  24. scheduleNext();
  25. }
  26. }
  27. protected synchronized void scheduleNext() {
  28. if ((active = tasks.poll()) != null) {
  29. // 具體的執行轉給真正的執行器 executor
  30. executor.execute(active);
  31. }
  32. }
  33. }

當然了,Executor 這個介面只有提交任務的功能,太簡單了,我們想要更豐富的功能,比如我們想知道執行結果、我們想知道當前執行緒池有多少個執行緒活著、已經完成了多少任務等等,這些都是這個介面的不足的地方。接下來我們要介紹的是繼承自Executor介面的ExecutorService介面,這個介面提供了比較豐富的功能,也是我們最常使用到的介面。

ExecutorService

一般我們定義一個執行緒池的時候,往往都是使用這個介面:

  1. ExecutorService executor = Executors.newFixedThreadPool(args...);
  2. ExecutorService executor = Executors.newCachedThreadPool(args...);

因為這個介面中定義的一系列方法大部分情況下已經可以滿足我們的需要了。

那麼我們簡單初略地來看一下這個介面中都有哪些方法:

  1. public interface ExecutorService extends Executor {
  2. // 關閉執行緒池,已提交的任務繼續執行,不接受繼續提交新任務
  3. void shutdown();
  4. // 關閉執行緒池,嘗試停止正在執行的所有任務,不接受繼續提交新任務
  5. // 它和前面的方法相比,加了一個單詞“now”,區別在於它會去停止當前正在進行的任務
  6. List<Runnable> shutdownNow();
  7. // 執行緒池是否已關閉
  8. boolean isShutdown();
  9. // 如果呼叫了 shutdown() 或 shutdownNow() 方法後,所有任務結束了,那麼返回true
  10. // 這個方法必須在呼叫shutdown或shutdownNow方法之後呼叫才會返回true
  11. boolean isTerminated();
  12. // 等待所有任務完成,並設定超時時間
  13. // 我們這麼理解,實際應用中是,先呼叫 shutdown 或 shutdownNow,
  14. // 然後再調這個方法等待所有的執行緒真正地完成,返回值意味著有沒有超時
  15. boolean awaitTermination(long timeout, TimeUnit unit)
  16. throws InterruptedException;
  17. // 提交一個 Callable 任務
  18. <T> Future<T> submit(Callable<T> task);
  19. // 提交一個 Runnable 任務,第二個引數將會放到 Future 中,作為返回值,
  20. // 因為 Runnable 的 run 方法本身並不返回任何東西
  21. <T> Future<T> submit(Runnable task, T result);
  22. // 提交一個 Runnable 任務
  23. Future<?> submit(Runnable task);
  24. // 執行所有任務,返回 Future 型別的一個 list
  25. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  26. throws InterruptedException;
  27. // 也是執行所有任務,但是這裡設定了超時時間
  28. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  29. long timeout, TimeUnit unit)
  30. throws InterruptedException;
  31. // 只有其中的一個任務結束了,就可以返回,返回執行完的那個任務的結果
  32. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  33. throws InterruptedException, ExecutionException;
  34. // 同上一個方法,只有其中的一個任務結束了,就可以返回,返回執行完的那個任務的結果,
  35. // 不過這個帶超時,超過指定的時間,丟擲 TimeoutException 異常
  36. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  37. long timeout, TimeUnit unit)
  38. throws InterruptedException, ExecutionException, TimeoutException;
  39. }

這些方法都很好理解,一個簡單的執行緒池主要就是這些功能,能提交任務,能獲取結果,能關閉執行緒池,這也是為什麼我們經常用這個介面的原因。

FutureTask

在繼續往下層介紹 ExecutorService 的實現類之前,我們先來說說相關的類 FutureTask。

  1. Future -> RunnableFuture -> FutureTask
  2. Runnable -> RunnableFuture
  3. FutureTask 通過 RunnableFuture 間接實現了 Runnable 介面,
  4. 所以每個 Runnable 通常都先包裝成 FutureTask,
  5. 然後呼叫 executor.execute(Runnable command) 將其提交給執行緒池

我們知道,Runnable 的 void run() 方法是沒有返回值的,所以,通常,如果我們需要的話,會在 submit 中指定第二個引數作為返回值:

<T> Future<T> submit(Runnable task, T result);

其實到時候會通過這兩個引數,將其包裝成 Callable。

Callable 也是因為執行緒池的需要,所以才有了這個介面。它和 Runnable 的區別在於 run() 沒有返回值,而 Callable 的 call() 方法有返回值,同時,如果執行出現異常,call() 方法會丟擲異常。

  1. public interface Callable<V> {
  2. V call() throws Exception;
  3. }

在這裡,就不展開說 FutureTask 類了,因為本文篇幅本來就夠大了,這裡我們需要知道怎麼用就行了。

下面,我們來看看ExecutorService的抽象實現AbstractExecutorService

AbstractExecutorService

AbstractExecutorService 抽象類派生自 ExecutorService 介面,然後在其基礎上實現了幾個實用的方法,這些方法提供給子類進行呼叫。

這個抽象類實現了 invokeAny 方法和 invokeAll 方法,這裡的兩個 newTaskFor 方法也比較有用,用於將任務包裝成 FutureTask。定義於最上層介面 Executor中的void execute(Runnable command)由於不需要獲取結果,不會進行 FutureTask 的包裝。

需要獲取結果(FutureTask),用 submit 方法,不需要獲取結果,可以用 execute 方法。

下面,我將一行一行原始碼地來分析這個類,跟著原始碼來看看其實現吧:

Tips: invokeAny 和 invokeAll 方法佔了這整個類的絕大多數篇幅,讀者可以選擇適當跳過,因為它們可能在你的實踐中使用的頻次比較低,而且它們不帶有承前啟後的作用,不用擔心會漏掉什麼導致看不懂後面的程式碼。

  1. public abstract class AbstractExecutorService implements ExecutorService {
  2. // RunnableFuture 是用於獲取執行結果的,我們常用它的子類 FutureTask
  3. // 下面兩個 newTaskFor 方法用於將我們的任務包裝成 FutureTask 提交到執行緒池中執行
  4. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
  5. return new FutureTask<T>(runnable, value);
  6. }
  7. protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  8. return new FutureTask<T>(callable);
  9. }
  10. // 提交任務
  11. public Future<?> submit(Runnable task) {
  12. if (task == null) throw new NullPointerException();
  13. // 1. 將任務包裝成 FutureTask
  14. RunnableFuture<Void> ftask = newTaskFor(task, null);
  15. // 2. 交給執行器執行,execute 方法由具體的子類來實現
  16. // 前面也說了,FutureTask 間接實現了Runnable 介面。
  17. execute(ftask);
  18. return ftask;
  19. }
  20. public <T> Future<T> submit(Runnable task, T result) {
  21. if (task == null) throw new NullPointerException();
  22. // 1. 將任務包裝成 FutureTask
  23. RunnableFuture<T> ftask = newTaskFor(task, result);
  24. // 2. 交給執行器執行
  25. execute(ftask);
  26. return ftask;
  27. }
  28. public <T> Future<T> submit(Callable<T> task) {
  29. if (task == null) throw new NullPointerException();
  30. // 1. 將任務包裝成 FutureTask
  31. RunnableFuture<T> ftask = newTaskFor(task);
  32. // 2. 交給執行器執行
  33. execute(ftask);
  34. return ftask;
  35. }
  36. // 此方法目的:將 tasks 集合中的任務提交到執行緒池執行,任意一個執行緒執行完後就可以結束了
  37. // 第二個引數 timed 代表是否設定超時機制,超時時間為第三個引數,
  38. // 如果 timed 為 true,同時超時了還沒有一個執行緒返回結果,那麼丟擲 TimeoutException 異常
  39. private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
  40. boolean timed, long nanos)
  41. throws InterruptedException, ExecutionException, TimeoutException {
  42. if (tasks == null)
  43. throw new NullPointerException();
  44. // 任務數
  45. int ntasks = tasks.size();
  46. if (ntasks == 0)
  47. throw new IllegalArgumentException();
  48. //
  49. List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
  50. // ExecutorCompletionService 不是一個真正的執行器,引數 this 才是真正的執行器
  51. // 它對執行器進行了包裝,每個任務結束後,將結果儲存到內部的一個 completionQueue 佇列中
  52. // 這也是為什麼這個類的名字裡面有個 Completion 的原因吧。
  53. ExecutorCompletionService<T> ecs =
  54. new ExecutorCompletionService<T>(this);
  55. try {
  56. // 用於儲存異常資訊,此方法如果沒有得到任何有效的結果,那麼我們可以丟擲最後得到的一個異常
  57. ExecutionException ee = null;
  58. long lastTime = timed ? System.nanoTime() : 0;
  59. Iterator<? extends Callable<T>> it = tasks.iterator();
  60. // 首先先提交一個任務,後面的任務到下面的 for 迴圈一個個提交
  61. futures.add(ecs.submit(it.next()));
  62. // 提交了一個任務,所以任務數量減 1
  63. --ntasks;
  64. // 正在執行的任務數(提交的時候 +1,任務結束的時候 -1)
  65. int active = 1;
  66. for (;;) {
  67. // ecs 上面說了,其內部有一個 completionQueue 用於儲存執行完成的結果
  68. // BlockingQueue 的 poll 方法不阻塞,返回 null 代表隊列為空
  69. Future<T> f = ecs.poll();
  70. // 為 null,說明剛剛提交的第一個執行緒還沒有執行完成
  71. // 在前面先提交一個任務,加上這裡做一次檢查,也是為了提高效能
  72. if (f == null) {
  73. if (ntasks > 0) {
  74. --ntasks;
  75. futures.add(ecs.submit(it.next()));
  76. ++active;
  77. }
  78. // 這裡是 else if,不是 if。這裡說明,沒有任務了,同時 active 為 0 說明
  79. // 任務都執行完成了。其實我也沒理解為什麼這裡做一次 break?
  80. // 因為我認為 active 為 0 的情況,必然從下面的 f.get() 返回了
  81. // 2018-02-23 感謝讀者 newmicro 的 comment,
  82. // 這裡的 active == 0,說明所有的任務都執行失敗,那麼這裡是 for 迴圈出口
  83. else if (active == 0)
  84. break;
  85. // 這裡也是 else if。這裡說的是,沒有任務了,但是設定了超時時間,這裡檢測是否超時
  86. else if (timed) {
  87. // 帶等待的 poll 方法
  88. f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
  89. // 如果已經超時,丟擲 TimeoutException 異常,這整個方法就結束了
  90. if (f == null)
  91. throw new TimeoutException();
  92. long now = System.nanoTime();
  93. nanos -= now - lastTime;
  94. lastTime = now;
  95. }
  96. // 這裡是 else。說明,沒有任務需要提交,但是池中的任務沒有完成,還沒有超時(如果設定了超時)
  97. // take() 方法會阻塞,直到有元素返回,說明有任務結束了
  98. else
  99. f = ecs.take();
  100. }
  101. /*
  102. * 我感覺上面這一段並不是很好理解,這裡簡單說下。
  103. * 1. 首先,這在一個 for 迴圈中,我們設想每一個任務都沒那麼快結束,
  104. * 那麼,每一次都會進到第一個分支,進行提交任務,直到將所有的任務都提交了
  105. * 2. 任務都提交完成後,如果設定了超時,那麼 for 迴圈其實進入了“一直檢測是否超時”
  106. 這件事情上
  107. * 3. 如果沒有設定超時機制,那麼不必要檢測超時,那就會阻塞在 ecs.take() 方法上,
  108. 等待獲取第一個執行結果
  109. * 4. 如果所有的任務都執行失敗,也就是說 future 都返回了,
  110. 但是 f.get() 丟擲異常,那麼從 active == 0 分支出去(感謝 newmicro 提出)
  111. // 當然,這個需要看下面的 if 分支。
  112. */
  113. // 有任務結束了
  114. if (f != null) {
  115. --active;
  116. try {
  117. // 返回執行結果,如果有異常,都包裝成 ExecutionException
  118. return f.get();
  119. } catch (ExecutionException eex) {
  120. ee = eex;
  121. } catch (RuntimeException rex) {
  122. ee = new ExecutionException(rex);
  123. }
  124. }
  125. }// 注意看 for 迴圈的範圍,一直到這裡
  126. if (ee == null)
  127. ee = new ExecutionException();
  128. throw ee;
  129. } finally {
  130. // 方法退出之前,取消其他的任務
  131. for (Future<T> f : futures)
  132. f.cancel(true);
  133. }
  134. }
  135. public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  136. throws InterruptedException, ExecutionException {
  137. try {
  138. return doInvokeAny(tasks, false, 0);
  139. } catch (TimeoutException cannotHappen) {
  140. assert false;
  141. return null;
  142. }
  143. }
  144. public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
  145. long timeout, TimeUnit unit)
  146. throws InterruptedException, ExecutionException, TimeoutException {
  147. return doInvokeAny(tasks, true, unit.toNanos(timeout));
  148. }
  149. // 執行所有的任務,返回任務結果。
  150. // 先不要看這個方法,我們先想想,其實我們自己提交任務到執行緒池,也是想要執行緒池執行所有的任務
  151. // 只不過,我們是每次 submit 一個任務,這裡以一個集合作為引數提交
  152. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  153. throws InterruptedException {
  154. if (tasks == null)
  155. throw new NullPointerException();
  156. List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  157. boolean done = false;
  158. try {
  159. // 這個很簡單
  160. for (Callable<T> t : tasks) {
  161. // 包裝成 FutureTask
  162. RunnableFuture<T> f = newTaskFor(t);
  163. futures.add(f);
  164. // 提交任務
  165. execute(f);
  166. }
  167. for (Future<T> f : futures) {
  168. if (!f.isDone()) {
  169. try {
  170. // 這是一個阻塞方法,直到獲取到值,或丟擲了異常
  171. // 這裡有個小細節,其實 get 方法簽名上是會丟擲 InterruptedException 的
  172. // 可是這裡沒有進行處理,而是拋給外層去了。此異常發生於還沒執行完的任務被取消了
  173. f.get();
  174. } catch (CancellationException ignore) {
  175. } catch (ExecutionException ignore) {
  176. }
  177. }
  178. }
  179. done = true;
  180. // 這個方法返回,不像其他的場景,返回 List<Future>,其實執行結果還沒出來
  181. // 這個方法返回是真正的返回,任務都結束了
  182. return futures;
  183. } finally {
  184. // 為什麼要這個?就是上面說的有異常的情況
  185. if (!done)
  186. for (Future<T> f : futures)
  187. f.cancel(true);
  188. }
  189. }
  190. // 帶超時的 invokeAll,我們找不同吧
  191. public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  192. long timeout, TimeUnit unit)
  193. throws InterruptedException {
  194. if (tasks == null || unit == null)
  195. throw new NullPointerException();
  196. long nanos = unit.toNanos(timeout);
  197. List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  198. boolean done = false;
  199. try {
  200. for (Callable<T> t : tasks)
  201. futures.add(newTaskFor(t));
  202. long lastTime = System.nanoTime();
  203. Iterator<Future<T>> it = futures.iterator();
  204. // 提交一個任務,檢測一次是否超時
  205. while (it.hasNext()) {
  206. execute((Runnable)(it.next()));
  207. long now = System.nanoTime();
  208. nanos -= now - lastTime;
  209. lastTime = now;
  210. // 超時
  211. if (nanos <= 0)
  212. return futures;
  213. }
  214. for (Future<T> f : futures) {
  215. if (!f.isDone()) {
  216. if (nanos <= 0)
  217. return futures;
  218. try {
  219. // 呼叫帶超時的 get 方法,這裡的引數 nanos 是剩餘的時間,
  220. // 因為上面其實已經用掉了一些時間了
  221. f.get(nanos, TimeUnit.NANOSECONDS);
  222. } catch (CancellationException ignore) {
  223. } catch (ExecutionException ignore) {
  224. } catch (TimeoutException toe) {
  225. return futures;
  226. }
  227. long now = System.nanoTime();
  228. nanos -= now - lastTime;
  229. lastTime = now;
  230. }
  231. }
  232. done = true;
  233. return futures;
  234. } finally {
  235. if (!done)
  236. for (Future<T> f : futures)
  237. f.cancel(true);
  238. }
  239. }
  240. }

到這裡,我們發現,這個抽象類包裝了一些基本的方法,可是像 submit、invokeAny、invokeAll 等方法,它們都沒有真正開啟執行緒來執行任務,它們都只是在方法內部呼叫了 execute 方法,所以最重要的 execute(Runnable runnable) 方法還沒出現,需要等具體執行器來實現這個最重要的部分,這裡我們要說的就是 ThreadPoolExecutor 類了。

鑑於本文的篇幅,我覺得看到這裡的讀者應該已經不多了,快餐文化使然啊!我寫的每篇文章都力求讓讀者可以通過我的一篇文章而記住所有的相關知識點,所以篇幅不免長了些。其實,工作了很多年的話,會有一個感覺,比如說執行緒池,即使看了 20 篇各種總結,也不如一篇長文實實在在講解清楚每一個知識點,有點少即是多,多即是少的意味了。

ThreadPoolExecutor

ThreadPoolExecutor 是 JDK 中的執行緒池實現,這個類實現了一個執行緒池需要的各個方法,它實現了任務提交、執行緒管理、監控等等方法。

我們可以基於它來進行業務上的擴充套件,以實現我們需要的其他功能,比如實現定時任務的類 ScheduledThreadPoolExecutor 就繼承自 ThreadPoolExecutor。當然,這不是本文關注的重點,下面,還是趕緊進行原始碼分析吧。

首先,我們來看看執行緒池實現中的幾個概念和處理流程。

我們先回顧下提交任務的幾個方法:

  1. public Future<?> submit(Runnable task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<Void> ftask = newTaskFor(task, null);
  4. execute(ftask);
  5. return ftask;
  6. }
  7. public <T> Future<T> submit(Runnable task, T result) {
  8. if (task == null) throw new NullPointerException();
  9. RunnableFuture<T> ftask = newTaskFor(task, result);
  10. execute(ftask);
  11. return ftask;
  12. }
  13. public <T> Future<T> submit(Callable<T> task) {
  14. if (task == null) throw new NullPointerException();
  15. RunnableFuture<T> ftask = newTaskFor(task);
  16. execute(ftask);
  17. return ftask;
  18. }

一個最基本的概念是,submit 方法中,引數是 Runnable 型別(也有Callable 型別),這個引數不是用於 new Thread(runnable).start() 中的,此處的這個引數不是用於啟動執行緒的,這裡指的是任務,任務要做的事情是 run() 方法裡面定義的或 Callable 中的 call() 方法裡面定義的。

初學者往往會搞混這個,因為 Runnable 總是在各個地方出現,經常把一個 Runnable 包到另一個 Runnable 中。請把它想象成有個 Task 介面,這個接口裡面有一個 run() 方法(我想作者只是不想因為這個再定義一個完全可以用 Runnable 來代替的介面,Callable 的出現,完全是因為 Runnable 不能滿足需要)。

我們回過神來繼續往下看,我畫了一個簡單的示意圖來描述執行緒池中的一些主要的構件:

當然,上圖沒有考慮佇列是否有界,提交任務時佇列滿了怎麼辦?什麼情況下會建立新的執行緒?提交任務時執行緒池滿了怎麼辦?空閒執行緒怎麼關掉?這些問題下面我們會一一解決。

我們經常會使用Executors這個工具類來快速構造一個執行緒池,對於初學者而言,這種工具類是很有用的,開發者不需要關注太多的細節,只要知道自己需要一個執行緒池,僅僅提供必需的引數就可以了,其他引數都採用作者提供的預設值。

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }
  6. public static ExecutorService newCachedThreadPool() {
  7. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  8. 60L, TimeUnit.SECONDS,
  9. new SynchronousQueue<Runnable>());
  10. }

這裡先不說有什麼區別,它們最終都會導向這個構造方法:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. if (corePoolSize < 0 ||
  9. maximumPoolSize <= 0 ||
  10. maximumPoolSize < corePoolSize ||
  11. keepAliveTime < 0)
  12. throw new IllegalArgumentException();
  13. // 這幾個引數都是必須要有的
  14. if (workQueue == null || threadFactory == null || handler == null)
  15. throw new NullPointerException();
  16. this.corePoolSize = corePoolSize;
  17. this.maximumPoolSize = maximumPoolSize;
  18. this.workQueue = workQueue;
  19. this.keepAliveTime = unit.toNanos(keepAliveTime);
  20. this.threadFactory = threadFactory;
  21. this.handler = handler;
  22. }

基本上,上面的構造方法中列出了我們最需要關心的幾個屬性了,下面逐個介紹下構造方法中出現的這幾個屬性:

  • corePoolSize

    核心執行緒數,不要摳字眼,反正先記著有這麼個屬性就可以了。

  • maximumPoolSize

    ​最大執行緒數,執行緒池允許建立的最大執行緒數。

  • workQueue

    任務佇列,BlockingQueue 介面的某個實現(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)。

  • keepAliveTime

    空閒執行緒的保活時間,如果某執行緒的空閒時間超過這個值都沒有任務給它做,那麼可以被關閉了。注意這個值並不會對所有執行緒起作用,如果執行緒池中的執行緒數少於等於核心執行緒數 corePoolSize,那麼這些執行緒不會因為空閒太長時間而被關閉,當然,也可以通過呼叫allowCoreThreadTimeOut(true)使核心執行緒數內的執行緒也可以被回收。

  • threadFactory

    用於生成執行緒,一般我們可以用預設的就可以了。通常,我們可以通過它將我們的執行緒的名字設定得比較可讀一些,如 Message-Thread-1, Message-Thread-2 類似這樣。

  • handler:

    當執行緒池已經滿了,但是又有新的任務提交的時候,該採取什麼策略由這個來指定。有幾種方式可供選擇,像丟擲異常、直接拒絕然後返回等,也可以自己實現相應的介面實現自己的邏輯,這個之後再說。

除了上面幾個屬性外,我們再看看其他重要的屬性。

Doug Lea 採用一個 32 位的整數來存放執行緒池的狀態和當前池中的執行緒數,其中高 3 位用於存放執行緒池狀態,低 29 位表示執行緒數(即使只有 29 位,也已經不小了,大概 5 億多,現在還沒有哪個機器能起這麼多執行緒的吧)。我們知道,java 語言在整數編碼上是統一的,都是採用補碼的形式,下面是簡單的移位操作和布林操作,都是挺簡單的。

  1. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  2. // 這裡 COUNT_BITS 設定為 29(32-3),意味著前三位用於存放執行緒狀態,後29位用於存放執行緒數
  3. // 很多初學者很喜歡在自己的程式碼中寫很多 29 這種數字,或者某個特殊的字串,然後分佈在各個地方,這是非常糟糕的
  4. private static final int COUNT_BITS = Integer.SIZE - 3;
  5. // 000 11111111111111111111111111111
  6. // 這裡得到的是 29 個 1,也就是說執行緒池的最大執行緒數是 2^29-1=536870911
  7. // 以我們現在計算機的實際情況,這個數量還是夠用的
  8. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  9. // 我們說了,執行緒池的狀態存放在高 3 位中
  10. // 運算結果為 111跟29個0:111 00000000000000000000000000000
  11. private static final int RUNNING = -1 << COUNT_BITS;
  12. // 000 00000000000000000000000000000
  13. private static final int SHUTDOWN = 0 << COUNT_BITS;
  14. // 001 00000000000000000000000000000
  15. private static final int STOP = 1 << COUNT_BITS;
  16. // 010 00000000000000000000000000000
  17. private static final int TIDYING = 2 << COUNT_BITS;
  18. // 011 00000000000000000000000000000
  19. private static final int TERMINATED = 3 << COUNT_BITS;
  20. // 將整數 c 的低 29 位修改為 0,就得到了執行緒池的狀態
  21. private static int runStateOf(int c) { return c & ~CAPACITY; }
  22. // 將整數 c 的高 3 為修改為 0,就得到了執行緒池中的執行緒數
  23. private static int workerCountOf(int c) { return c & CAPACITY; }
  24. private static int ctlOf(int rs, int wc) { return rs | wc; }
  25. /*
  26. * Bit field accessors that don't require unpacking ctl.
  27. * These depend on the bit layout and on workerCount being never negative.
  28. */
  29. private static boolean runStateLessThan(int c, int s) {
  30. return c < s;
  31. }
  32. private static boolean runStateAtLeast(int c, int s) {
  33. return c >= s;
  34. }
  35. private static boolean isRunning(int c) {
  36. return c < SHUTDOWN;
  37. }

上面就是對一個整數的簡單的位操作,幾個操作方法將會在後面的原始碼中一直出現,所以讀者最好把方法名字和其代表的功能記住,看原始碼的時候也就不需要來來回回翻了。

在這裡,介紹下執行緒池中的各個狀態和狀態變化的轉換過程:

  • RUNNING:這個沒什麼好說的,這是最正常的狀態:接受新的任務,處理等待佇列中的任務
  • SHUTDOWN:不接受新的任務提交,但是會繼續處理等待佇列中的任務
  • STOP:不接受新的任務提交,不再處理等待佇列中的任務,中斷正在執行任務的執行緒
  • TIDYING:所有的任務都銷燬了,workCount 為 0。執行緒池的狀態在轉換為 TIDYING 狀態時,會執行鉤子方法 terminated()
  • TERMINATED:terminated() 方法結束後,執行緒池的狀態就會變成這個

RUNNING 定義為 -1,SHUTDOWN 定義為 0,其他的都比 0 大,所以等於 0 的時候不能提交任務,大於 0 的話,連正在執行的任務也需要中斷。

看了這幾種狀態的介紹,讀者大體也可以猜到十之八九的狀態轉換了,各個狀態的轉換過程有以下幾種:

  • RUNNING -> SHUTDOWN:當呼叫了 shutdown() 後,會發生這個狀態轉換,這也是最重要的
  • (RUNNING or SHUTDOWN) -> STOP:當呼叫 shutdownNow() 後,會發生這個狀態轉換,這下要清楚 shutDown() 和 shutDownNow() 的區別了
  • SHUTDOWN -> TIDYING:當任務佇列和執行緒池都清空後,會由 SHUTDOWN 轉換為 TIDYING
  • STOP -> TIDYING:當任務佇列清空後,發生這個轉換
  • TIDYING -> TERMINATED:這個前面說了,當 terminated() 方法結束後

上面的幾個記住核心的就可以了,尤其第一個和第二個。

另外,我們還要看看一個內部類 Worker,因為 Doug Lea 把執行緒池中的執行緒包裝成了一個個 Worker,翻譯成工人,就是執行緒池中做任務的執行緒。所以到這裡,我們知道任務是 Runnable(內部叫 task 或 command),執行緒是 Worker。

Worker 這裡又用到了抽象類 AbstractQueuedSynchronizer。題外話,AQS 在併發中真的是到處出現,而且非常容易使用,寫少量的程式碼就能實現自己需要的同步方式(對 AQS 原始碼感興趣的讀者請參看我之前寫的幾篇文章)。

  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable
  4. {
  5. private static final long serialVersionUID = 6138294804551838833L;
  6. // 這個是真正的執行緒,任務靠你啦
  7. final Thread thread;
  8. // 前面說了,這裡的 Runnable 是任務。為什麼叫 firstTask?因為在建立執行緒的時候,如果同時指定了
  9. // 這個執行緒起來以後需要執行的第一個任務,那麼第一個任務就是存放在這裡的(執行緒可不止執行這一個任務)
  10. // 當然了,也可以為 null,這樣執行緒起來了,自己到任務佇列(BlockingQueue)中取任務(getTask 方法)就行了
  11. Runnable firstTask;
  12. // 用於存放此執行緒完全的任務數,注意了,這裡用了 volatile,保證可見性
  13. volatile long completedTasks;
  14. // Worker 只有這一個構造方法,傳入 firstTask,也可以傳 null
  15. Worker(Runnable firstTask) {
  16. setState(-1); // inhibit interrupts until runWorker
  17. this.firstTask = firstTask;
  18. // 呼叫 ThreadFactory 來建立一個新的執行緒
  19. this.thread = getThreadFactory().newThread(this);
  20. }
  21. // 這裡呼叫了外部類的 runWorker 方法
  22. public void run() {
  23. runWorker(this);
  24. }
  25. ...// 其他幾個方法沒什麼好看的,就是用 AQS 操作,來獲取這個執行緒的執行權,用了獨佔鎖
  26. }

前面雖然囉嗦,但是簡單。有了上面的這些基礎後,我們終於可以看看 ThreadPoolExecutor 的 execute 方法了,前面原始碼分析的時候也說了,各種方法都最終依賴於 execute 方法:

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. // 前面說的那個表示 “執行緒池狀態” 和 “執行緒數” 的整數
  5. int c = ctl.get();
  6. // 如果當前執行緒數少於核心執行緒數,那麼直接新增一個 worker 來執行任務,
  7. // 建立一個新的執行緒,並把當前任務 command 作為這個執行緒的第一個任務(firstTask)
  8. if (workerCountOf(c) < corePoolSize) {
  9. // 新增任務成功,那麼就結束了。提交任務嘛,執行緒池已經接受了這個任務,這個方法也就可以返回了
  10. // 至於執行的結果,到時候會包裝到 FutureTask 中。
  11. // 返回 false 代表執行緒池不允許提交任務
  12. if (addWorker(command, true))
  13. return;
  14. c = ctl.get();
  15. }
  16. // 到這裡說明,要麼當前執行緒數大於等於核心執行緒數,要麼剛剛 addWorker 失敗了
  17. // 如果執行緒池處於 RUNNING 狀態,把這個任務新增到任務佇列 workQueue 中
  18. if (isRunning(c) && workQueue.offer(command)) {
  19. /* 這裡面說的是,如果任務進入了 workQueue,我們是否需要開啟新的執行緒
  20. * 因為執行緒數在 [0, corePoolSize) 是無條件開啟新的執行緒
  21. * 如果執行緒數已經大於等於 corePoolSize,那麼將任務新增到佇列中,然後進到這裡
  22. */
  23. int recheck = ctl.get();
  24. // 如果執行緒池已不處於 RUNNING 狀態,那麼移除已經入隊的這個任務,並且執行拒絕策略
  25. if (! isRunning(recheck) && remove(command))
  26. reject(command);
  27. // 如果執行緒池還是 RUNNING 的,並且執行緒數為 0,那麼開啟新的執行緒
  28. // 到這裡,我們知道了,這塊程式碼的真正意圖是:擔心任務提交到佇列中了,但是執行緒都關閉了
  29. else if (workerCountOf(recheck) == 0)
  30. addWorker(null, false);
  31. }
  32. // 如果 workQueue 佇列滿了,那麼進入到這個分支
  33. // 以 maximumPoolSize 為界建立新的 worker,
  34. // 如果失敗,說明當前執行緒數已經達到 maximumPoolSize,執行拒絕策略
  35. else if (!addWorker(command, false))
  36. reject(command);
  37. }

對建立執行緒的錯誤理解:如果執行緒數少於 corePoolSize,建立一個執行緒,如果執行緒數在 [corePoolSize, maximumPoolSize] 之間那麼可以建立執行緒或複用空閒執行緒,keepAliveTime 對這個區間的執行緒有效。

從上面的幾個分支,我們就可以看出,上面的這段話是錯誤的。

上面這些一時半會也不可能全部消化搞定,我們先繼續往下吧,到時候再回頭看幾遍。

這個方法非常重要 addWorker(Runnable firstTask, boolean core) 方法,我們看看它是怎麼建立新的執行緒的:

  1. // 第一個引數是準備提交給這個執行緒執行的任務,之前說了,可以為 null
  2. // 第二個引數為 true 代表使用核心執行緒數 corePoolSize 作為建立執行緒的界線,也就說建立這個執行緒的時候,
  3. // 如果執行緒池中的執行緒總數已經達到 corePoolSize,那麼不能響應這次建立執行緒的請求
  4. // 如果是 false,代表使用最大執行緒數 maximumPoolSize 作為界線
  5. private boolean addWorker(Runnable firstTask, boolean core) {
  6. retry:
  7. for (;;) {
  8. int c = ctl.get();
  9. int rs = runStateOf(c);
  10. // 這個非常不好理解
  11. // 如果執行緒池已關閉,並滿足以下條件之一,那麼不建立新的 worker:
  12. // 1. 執行緒池狀態大於 SHUTDOWN,其實也就是 STOP, TIDYING, 或 TERMINATED
  13. // 2. firstTask != null
  14. // 3. workQueue.isEmpty()
  15. // 簡單分析下:
  16. // 還是狀態控制的問題,當執行緒池處於 SHUTDOWN 的時候,不允許提交任務,但是已有的任務繼續執行
  17. // 當狀態大於 SHUTDOWN 時,不允許提交任務,且中斷正在執行的任務
  18. // 多說一句:如果執行緒池處於 SHUTDOWN,但是 firstTask 為 null,且 workQueue 非空,那麼是允許建立 worker 的
  19. if (rs >= SHUTDOWN &&
  20. ! (rs == SHUTDOWN &&
  21. firstTask == null &&
  22. ! workQueue.isEmpty()))
  23. return false;
  24. for (;;) {
  25. int wc = workerCountOf(c);
  26. if (wc >= CAPACITY ||
  27. wc >= (core ? corePoolSize : maximumPoolSize))
  28. return false;
  29. // 如果成功,那麼就是所有建立執行緒前的條件校驗都滿足了,準備建立執行緒執行任務了
  30. // 這裡失敗的話,說明有其他執行緒也在嘗試往執行緒池中建立執行緒
  31. if (compareAndIncrementWorkerCount(c))
  32. break retry;
  33. // 由於有併發,重新再讀取一下 ctl
  34. c = ctl.get();
  35. // 正常如果是 CAS 失敗的話,進到下一個裡層的for迴圈就可以了
  36. // 可是如果是因為其他執行緒的操作,導致執行緒池的狀態發生了變更,如有其他執行緒關閉了這個執行緒池
  37. // 那麼需要回到外層的for迴圈
  38. if (runStateOf(c) != rs)
  39. continue retry;
  40. // else CAS failed due to workerCount change; retry inner loop
  41. }
  42. }
  43. /*
  44. * 到這裡,我們認為在當前這個時刻,可以開始建立執行緒來執行任務了,
  45. * 因為該校驗的都校驗了,至於以後會發生什麼,那是以後的事,至少當前是滿足條件的
  46. */
  47. // worker 是否已經啟動
  48. boolean workerStarted = false;
  49. // 是否已將這個 worker 新增到 workers 這個 HashSet 中
  50. boolean workerAdded = false;
  51. Worker w = null;
  52. try {
  53. final ReentrantLock mainLock = this.mainLock;
  54. // 把 firstTask 傳給 worker 的構造方法
  55. w = new Worker(firstTask);
  56. // 取 worker 中的執行緒物件,之前說了,Worker的構造方法會呼叫 ThreadFactory 來建立一個新的執行緒
  57. final Thread t = w.thread;
  58. if (t != null) {
  59. // 這個是整個類的全域性鎖,持有這個鎖才能讓下面的操作“順理成章”,
  60. // 因為關閉一個執行緒池需要這個鎖,至少我持有鎖的期間,執行緒池不會被關閉
  61. mainLock.lock();
  62. try {
  63. int c = ctl.get();
  64. int rs = runStateOf(c);
  65. // 小於 SHUTTDOWN 那就是 RUNNING,這個自不必說,是最正常的情況
  66. // 如果等於 SHUTDOWN,前面說了,不接受新的任務,但是會繼續執行等待佇列中的任務
  67. if (rs < SHUTDOWN ||
  68. (rs == SHUTDOWN && firstTask == null)) {
  69. // worker 裡面的 thread 可不能是已經啟動的
  70. if (t.isAlive())
  71. throw new IllegalThreadStateException();
  72. // 加到 workers 這個 HashSet 中
  73. workers.add(w);
  74. int s = workers.size();
  75. // largestPoolSize 用於記錄 workers 中的個數的最大值
  76. // 因為 workers 是不斷增加減少的,通過這個值可以知道執行緒池的大小曾經達到的最大值
  77. if (s > largestPoolSize)
  78. largestPoolSize = s;
  79. workerAdded = true;
  80. }
  81. } finally {
  82. mainLock.unlock();
  83. }
  84. // 新增成功的話,啟動這個執行緒
  85. if (workerAdded) {
  86. // 啟動執行緒
  87. t.start();
  88. workerStarted = true;
  89. }
  90. }
  91. } finally {
  92. // 如果執行緒沒有啟動,需要做一些清理工作,如前面 workCount 加了 1,將其減掉
  93. if (! workerStarted)
  94. addWorkerFailed(w);
  95. }
  96. // 返回執行緒是否啟動成功
  97. return workerStarted;
  98. }

簡單看下 addWorkFailed 的處理:

  1. // workers 中刪除掉相應的 worker
  2. // workCount 減 1
  3. private void addWorkerFailed(Worker w) {
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. try {
  7. if (w != null)
  8. workers.remove(w);
  9. decrementWorkerCount();
  10. // rechecks for termination, in case the existence of this worker was holding up termination
  11. tryTerminate();
  12. } finally {
  13. mainLock.unlock();
  14. }
  15. }

回過頭來,繼續往下走。我們知道,worker 中的執行緒 start 後,其 run 方法會呼叫 runWorker 方法:

  1. // Worker 類的 run() 方法
  2. public void run() {
  3. runWorker(this);
  4. }

繼續往下看 runWorker 方法:

  1. // 此方法由 worker 執行緒啟動後呼叫,這裡用一個 while 迴圈來不斷地從等待佇列中獲取任務並執行
  2. // 前面說了,worker 在初始化的時候,可以指定 firstTask,那麼第一個任務也就可以不需要從佇列中獲取
  3. final void runWorker(Worker w) {
  4. //
  5. Thread wt = Thread.currentThread();
  6. // 該執行緒的第一個任務(如果有的話)
  7. Runnable task = w.firstTask;
  8. w.firstTask = null;
  9. w.unlock(); // allow interrupts
  10. boolean completedAbruptly = true;
  11. try {
  12. // 迴圈呼叫 getTask 獲取任務
  13. while (task != null || (task = getTask()) != null) {
  14. w.lock();
  15. // 如果執行緒池狀態大於等於 STOP,那麼意味著該執行緒也要中斷
  16. if ((runStateAtLeast(ctl.get(), STOP) ||
  17. (Thread.interrupted() &&
  18. runStateAtLeast(ctl.get(), STOP))) &&
  19. !wt.isInterrupted())
  20. wt.interrupt();
  21. try {
  22. // 這是一個鉤子方法,留給需要的子類實現
  23. beforeExecute(wt, task);
  24. Throwable thrown = null;
  25. try {
  26. // 到這裡終於可以執行任務了
  27. task.run();
  28. } catch (RuntimeException x) {
  29. thrown = x; throw x;
  30. } catch (Error x) {
  31. thrown = x; throw x;
  32. } catch (Throwable x) {
  33. // 這裡不允許丟擲 Throwable,所以轉換為 Error
  34. thrown = x; throw new Error(x);
  35. } finally {
  36. // 也是一個鉤子方法,將 task 和異常作為引數,留給需要的子類實現
  37. afterExecute(task, thrown);
  38. }
  39. } finally {
  40. // 置空 task,準備 getTask 獲取下一個任務
  41. task = null;
  42. // 累加完成的任務數
  43. w.completedTasks++;
  44. // 釋放掉 worker 的獨佔鎖
  45. w.unlock();
  46. }
  47. }
  48. completedAbruptly = false;
  49. } finally {
  50. // 如果到這裡,需要執行執行緒關閉:
  51. // 1. 說明 getTask 返回 null,也就是說,這個 worker 的使命結束了,執行關閉
  52. // 2. 任務執行過程中發生了異常
  53. // 第一種情況,已經在程式碼處理了將 workCount 減 1,這個在 getTask 方法分析中會說
  54. // 第二種情況,workCount 沒有進行處理,所以需要在 processWorkerExit 中處理
  55. // 限於篇幅,我不準備分析這個方法了,感興趣的讀者請自行分析原始碼
  56. processWorkerExit(w, completedAbruptly);
  57. }
  58. }

我們看看 getTask() 是怎麼獲取任務的,這個方法寫得真的很好,每一行都很簡單,組合起來卻所有的情況都想好了:

  1. // 此方法有三種可能:
  2. // 1. 阻塞直到獲取到任務返回。我們知道,預設 corePoolSize 之內的執行緒是不會被回收的,
  3. // 它們會一直等待任務
  4. // 2. 超時退出。keepAliveTime 起作用的時候,也就是如果這麼多時間內都沒有任務,那麼應該執行關閉
  5. // 3. 如果發生了以下條件,此方法必須返回 null:
  6. // - 池中有大於 maximumPoolSize 個 workers 存在(通過呼叫 setMaximumPoolSize 進行設定)
  7. // - 執行緒池處於 SHUTDOWN,而且 workQueue 是空的,前面說了,這種不再接受新的任務
  8. // - 執行緒池處於 STOP,不僅不接受新的執行緒,連 workQueue 中的執行緒也不再執行
  9. private Runnable getTask() {
  10. boolean timedOut = false; // Did the last poll() time out?
  11. retry:
  12. for (;;) {
  13. int c = ctl.get();
  14. int rs = runStateOf(c);
  15. // 兩種可能
  16. // 1. rs == SHUTDOWN && workQueue.isEmpty()
  17. // 2. rs >= STOP
  18. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  19. // CAS 操作,減少工作執行緒數
  20. decrementWorkerCount();
  21. return null;
  22. }
  23. boolean timed; // Are workers subject to culling?
  24. for (;;) {
  25. int wc = workerCountOf(c);
  26. // 允許核心執行緒數內的執行緒回收,或當前執行緒數超過了核心執行緒數,那麼有可能發生超時關閉
  27. timed = allowCoreThreadTimeOut || wc > corePoolSize;
  28. // 這裡 break,是為了不往下執行後一個 if (compareAndDecrementWorkerCount(c))
  29. // 兩個 if 一起看:如果當前執行緒數 wc > maximumPoolSize,或者超時,都返回 null
  30. // 那這裡的問題來了,wc > maximumPoolSize 的情況,為什麼要返回 null?
  31. // 換句話說,返回 null 意味著關閉執行緒。
  32. // 那是因為有可能開發者呼叫了 setMaximumPoolSize 將執行緒池的 maximumPoolSize 調小了
  33. if (wc <= maximumPoolSize && ! (timedOut && timed))
  34. break;
  35. if (compareAndDecrementWorkerCount(c))
  36. return null;
  37. c = ctl.get(); // Re-read ctl
  38. // compareAndDecrementWorkerCount(c) 失敗,執行緒池中的執行緒數發生了改變
  39. if (runStateOf(c) != rs)
  40. continue retry;
  41. // else CAS failed due to workerCount change; retry inner loop
  42. }
  43. // wc <= maximumPoolSize 同時沒有超時
  44. try {
  45. // 到 workQueue 中獲取任務
  46. Runnable r = timed ?
  47. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  48. workQueue.take();
  49. if (r != null)
  50. return r;
  51. timedOut = true;
  52. } catch (InterruptedException retry) {
  53. // 如果此 worker 發生了中斷,採取的方案是重試
  54. // 解釋下為什麼會發生中斷,這個讀者要去看 setMaximumPoolSize 方法,
  55. // 如果開發者將 maximumPoolSize 調小了,導致其小於當前的 workers 數量,
  56. // 那麼意味著超出的部分執行緒要被關閉。重新進入 for 迴圈,自然會有部分執行緒會返回 null
  57. timedOut = false;
  58. }
  59. }
  60. }

到這裡,基本上也說完了整個流程,讀者這個時候應該回到 execute(Runnable command) 方法,看看各個分支,我把程式碼貼過來一下:

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. // 前面說的那個表示 “執行緒池狀態” 和 “執行緒數” 的整數
  5. int c = ctl.get();
  6. // 如果當前執行緒數少於核心執行緒數,那麼直接新增一個 worker 來執行任務,
  7. // 建立一個新的執行緒,並把當前任務 command 作為這個執行緒的第一個任務(firstTask)
  8. if (workerCountOf(c) < corePoolSize) {
  9. // 新增任務成功,那麼就結束了。提交任務嘛,執行緒池已經接受了這個任務,這個方法也就可以返回了
  10. // 至於執行的結果,到時候會包裝到 FutureTask 中。
  11. // 返回 false 代表執行緒池不允許提交任務
  12. if (addWorker(command, true))
  13. return;
  14. c = ctl.get();
  15. }
  16. // 到這裡說明,要麼當前執行緒數大於等於核心執行緒數,要麼剛剛 addWorker 失敗了
  17. // 如果執行緒池處於 RUNNING 狀態,把這個任務新增到任務佇列 workQueue 中
  18. if (isRunning(c) && workQueue.offer(command)) {
  19. /* 這裡面說的是,如果任務進入了 workQueue,我們是否需要開啟新的執行緒
  20. * 因為執行緒數在 [0, corePoolSize) 是無條件開啟新的執行緒
  21. * 如果執行緒數已經大於等於 corePoolSize,那麼將任務新增到佇列中,然後進到這裡
  22. */
  23. int recheck = ctl.get();
  24. // 如果執行緒池已不處於 RUNNING 狀態,那麼移除已經入隊的這個任務,並且執行拒絕策略
  25. if (! isRunning(recheck) && remove(command))
  26. reject(command);
  27. // 如果執行緒池還是 RUNNING 的,並且執行緒數為 0,那麼開啟新的執行緒
  28. // 到這裡,我們知道了,這塊程式碼的真正意圖是:擔心任務提交到佇列中了,但是執行緒都關閉了
  29. else if (workerCountOf(recheck) == 0)
  30. addWorker(null, false);
  31. }
  32. // 如果 workQueue 佇列滿了,那麼進入到這個分支
  33. // 以 maximumPoolSize 為界建立新的 worker,
  34. // 如果失敗,說明當前執行緒數已經達到 maximumPoolSize,執行拒絕策略
  35. else if (!addWorker(command, false))
  36. reject(command);
  37. }

上面各個分支中,有兩種情況會呼叫 reject(command) 來處理任務,因為按照正常的流程,執行緒池此時不能接受這個任務,所以需要執行我們的拒絕策略。接下來,我們說一說 ThreadPoolExecutor 中的拒絕策略。

  1. final void reject(Runnable command) {
  2. // 執行拒絕策略
  3. handler.rejectedExecution(command, this);
  4. }

此處的 handler 我們需要在構造執行緒池的時候就傳入這個引數,它是 RejectedExecutionHandler 的例項。

RejectedExecutionHandler 在 ThreadPoolExecutor 中有四個已經定義好的實現類可供我們直接使用,當然,我們也可以實現自己的策略,不過一般也沒有必要。

  1. // 只要執行緒池沒有被關閉,那麼由提交任務的執行緒自己來執行這個任務。
  2. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  3. public CallerRunsPolicy() { }
  4. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  5. if (!e.isShutdown()) {
  6. r.run();
  7. }
  8. }
  9. }
  10. // 不管怎樣,直接丟擲 RejectedExecutionException 異常
  11. // 這個是預設的策略,如果我們構造執行緒池的時候不傳相應的 handler 的話,那就會指定使用這個
  12. public static class AbortPolicy implements RejectedExecutionHandler {
  13. public AbortPolicy() { }
  14. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  15. throw new RejectedExecutionException("Task " + r.toString() +
  16. " rejected from " +
  17. e.toString());
  18. }
  19. }
  20. // 不做任何處理,直接忽略掉這個任務
  21. public static class DiscardPolicy implements RejectedExecutionHandler {
  22. public DiscardPolicy() { }
  23. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  24. }
  25. }
  26. // 這個相對霸道一點,如果執行緒池沒有被關閉的話,
  27. // 把佇列隊頭的任務(也就是等待了最長時間的)直接扔掉,然後提交這個任務到等待佇列中
  28. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  29. public DiscardOldestPolicy() { }
  30. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  31. if (!e.isShutdown()) {
  32. e.getQueue().poll();
  33. e.execute(r);
  34. }
  35. }
  36. }

到這裡,ThreadPoolExecutor 的原始碼算是分析結束了。單純從原始碼的難易程度來說,ThreadPoolExecutor 的原始碼還算是比較簡單的,只是需要我們靜下心來好好看看罷了。

結束執行緒池的相關方法

tryTerminate()

當執行緒池涉及到要移除worker時候都會呼叫tryTerminate(),該方法主要用於判斷執行緒池中的執行緒是否已經全部移除了,如果是的話則關閉執行緒池。

  1. final void tryTerminate() {
  2. for (;;) {
  3. int c = ctl.get();
  4. // 執行緒池處於Running狀態
  5. // 執行緒池已經終止了
  6. // 執行緒池處於ShutDown狀態,但是阻塞佇列不為空
  7. if (isRunning(c) ||
  8. runStateAtLeast(c, TIDYING) ||
  9. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
  10. return;
  11. // 執行到這裡,就意味著執行緒池要麼處於STOP狀態,要麼處於SHUTDOWN且阻塞佇列為空
  12. // 這時如果執行緒池中還存線上程,則會嘗試中斷執行緒
  13. if (workerCountOf(c) != 0) {
  14. // /執行緒池還有執行緒,但是佇列沒有任務了,需要中斷喚醒等待任務的執行緒
  15. // (runwoker的時候首先就通過w.unlock設定執行緒可中斷,getTask最後面的catch處理中斷)
  16. interruptIdleWorkers(ONLY_ONE);
  17. return;
  18. }
  19. final ReentrantLock mainLock = this.mainLock;
  20. mainLock.lock();
  21. try {
  22. // 嘗試終止執行緒池
  23. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  24. try {
  25. terminated();
  26. } finally {
  27. // 執行緒池狀態轉為TERMINATED
  28. ctl.set(ctlOf(TERMINATED, 0));
  29. termination.signalAll();
  30. }
  31. return;
  32. }
  33. } finally {
  34. mainLock.unlock();
  35. }
  36. }
  37. }

在關閉執行緒池的過程中,如果執行緒池處於STOP狀態或者處於SHUDOWN狀態且阻塞佇列為null,則執行緒池會呼叫interruptIdleWorkers()方法中斷所有執行緒,注意ONLY_ONE== true,表示僅中斷一個執行緒。

interruptIdleWorkers

  1. private void interruptIdleWorkers(boolean onlyOne) {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. for (Worker w : workers) {
  6. Thread t = w.thread;
  7. if (!t.isInterrupted() && w.tryLock()) {
  8. try {
  9. t.interrupt();
  10. } catch (SecurityException ignore) {
  11. } finally {
  12. w.unlock();
  13. }
  14. }
  15. if (onlyOne)
  16. break;
  17. }
  18. } finally {
  19. mainLock.unlock();
  20. }
  21. }

onlyOne==true僅終止一個執行緒,否則終止所有執行緒。

執行緒終止

執行緒池ThreadPoolExecutor提供了shutdown()和shutDownNow()用於關閉執行緒池。

shutdown():按過去執行已提交任務的順序發起一個有序的關閉,但是不接受新任務。

shutdownNow() :嘗試停止所有的活動執行任務、暫停等待任務的處理,並返回等待執行的任務列表。

shutdown

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. checkShutdownAccess();
  6. // 推進執行緒狀態
  7. advanceRunState(SHUTDOWN);
  8. // 中斷空閒的執行緒
  9. interruptIdleWorkers();
  10. // 交給子類實現
  11. onShutdown();
  12. } finally {
  13. mainLock.unlock();
  14. }
  15. tryTerminate();
  16. }

shutdownNow

  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. checkShutdownAccess();
  7. advanceRunState(STOP);
  8. // 中斷所有執行緒
  9. interruptWorkers();
  10. // 返回等待執行的任務列表
  11. tasks = drainQueue();
  12. } finally {
  13. mainLock.unlock();
  14. }
  15. tryTerminate();
  16. return tasks;
  17. }

與shutdown不同,shutdownNow會呼叫interruptWorkers()方法中斷所有執行緒。

  1. private void interruptWorkers() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. for (Worker w : workers)
  6. w.interruptIfStarted();
  7. } finally {
  8. mainLock.unlock();
  9. }
  10. }

同時會呼叫drainQueue()方法返回等待執行到任務列表。

  1. private List<Runnable> drainQueue() {
  2. BlockingQueue<Runnable> q = workQueue;
  3. ArrayList<Runnable> taskList = new ArrayList<Runnable>();
  4. q.drainTo(taskList);
  5. if (!q.isEmpty()) {
  6. for (Runnable r : q.toArray(new Runnable[0])) {
  7. if (q.remove(r))
  8. taskList.add(r);
  9. }
  10. }
  11. return taskList;
  12. }

Executors

這節其實也不是分析 Executors 這個類,因為它僅僅是工具類,它的所有方法都是 static 的。

1. FixedThreadPool

建立固定長度的執行緒池,每次提交任務建立一個執行緒,直到達到執行緒池的最大數量,執行緒池的大小不再變化。

這個執行緒池可以建立固定執行緒數的執行緒池。特點就是可以重用固定數量執行緒的執行緒池。它的構造原始碼如下:

1 2 3 4 5 publicstaticExecutorService newFixedThreadPool(intnThreads) { returnnewThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, newLinkedBlockingQueue<Runnable>()); }
  • FixedThreadPool的corePoolSize和maxiumPoolSize都被設定為建立FixedThreadPool時指定的引數nThreads。
  • 0L則表示當執行緒池中的執行緒數量操作核心執行緒的數量時,多餘的執行緒將被立即停止
  • 最後一個引數表示FixedThreadPool使用了無界佇列LinkedBlockingQueue作為執行緒池的做工佇列,由於是無界的,當執行緒池的執行緒數達到corePoolSize後,新任務將在無界佇列中等待,因此執行緒池的執行緒數量不會超過corePoolSize,同時maxiumPoolSize也就變成了一個無效的引數,並且執行中的執行緒池並不會拒絕任務。

FixedThreadPool執行圖如下

執行過程如下:

1.如果當前工作中的執行緒數量少於corePool的數量,就建立新的執行緒來執行任務。

2.當執行緒池的工作中的執行緒數量達到了corePool,則將任務加入LinkedBlockingQueue。

3.執行緒執行完1中的任務後會從佇列中去任務。

注意LinkedBlockingQueue是無界佇列,所以可以一直新增新任務到執行緒池。

2. SingleThreadExecutor  

SingleThreadExecutor是使用單個worker執行緒的Executor。特點是使用單個工作執行緒執行任務。它的構造原始碼如下:

1 2 3 4 5 6 publicstaticExecutorService newSingleThreadExecutor() { returnnewFinalizableDelegatedExecutorService (newThreadPoolExecutor(1,1, 0L, TimeUnit.MILLISECONDS, newLinkedBlockingQueue<Runnable>())); }

  

SingleThreadExecutor的corePoolSize和maxiumPoolSize都被設定1。 其他引數均與FixedThreadPool相同,其執行圖如下:

執行過程如下:

1.如果當前工作中的執行緒數量少於corePool的數量,就建立一個新的執行緒來執行任務。

2.當執行緒池的工作中的執行緒數量達到了corePool,則將任務加入LinkedBlockingQueue。

3.執行緒執行完1中的任務後會從佇列中去任務。

注意:由於線上程池中只有一個工作執行緒,所以任務可以按照新增順序執行。

3. CachedThreadPool

CachedThreadPool是一個”無限“容量的執行緒池,它會根據需要建立新執行緒。特點是可以根據需要來建立新的執行緒執行任務,沒有特定的corePool。下面是它的構造方法:

1 2 3 4 5 publicstaticExecutorService newCachedThreadPool() { returnnewThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, newSynchronousQueue<Runnable>()); }

  

CachedThreadPool的corePoolSize被設定為0,即corePool為空;maximumPoolSize被設定為Integer.MAX_VALUE,即maximum是無界的。這裡keepAliveTime設定為60秒,意味著空閒的執行緒最多可以等待任務60秒,否則將被回收。 CachedThreadPool使用沒有容量的SynchronousQueue作為主執行緒池的工作佇列,它是一個沒有容量的阻塞佇列。每個插入操作必須等待另一個執行緒的對應移除操作。這意味著,如果主執行緒提交任務的速度高於執行緒池中處理任務的速度時,CachedThreadPool會不斷建立新執行緒。極端情況下,CachedThreadPool會因為建立過多執行緒而耗盡CPU資源。其執行圖如下:

執行過程如下:

1.首先執行SynchronousQueue.offer(Runnable task)。如果在當前的執行緒池中有空閒的執行緒正在執行SynchronousQueue.poll(),那麼主執行緒執行的offer操作與空閒執行緒執行的poll操作配對成功,主執行緒把任務交給空閒執行緒執行。,execute()方法執行成功,否則執行步驟2

2.當執行緒池為空(初始maximumPool為空)或沒有空閒執行緒時,配對失敗,將沒有執行緒執行SynchronousQueue.poll操作。這種情況下,執行緒池會建立一個新的執行緒執行任務。

3.在建立完新的執行緒以後,將會執行poll操作。當步驟2的執行緒執行完成後,將等待60秒,如果此時主執行緒提交了一個新任務,那麼這個空閒執行緒將執行新任務,否則被回收。因此長時間不提交任務的CachedThreadPool不會佔用系統資源。

SynchronousQueue是一個不儲存元素阻塞佇列,每次要進行offer操作時必須等待poll操作,否則不能繼續新增元素。

具體使用案例

(1). newCachedThreadPool
建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。示例程式碼如下:

Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for(inti =0; i <10; i++) { finalintindex = i; try{ Thread.sleep(index *1000); }catch(InterruptedException e) { e.printStackTrace(); } cachedThreadPool.execute(newRunnable() { @Override publicvoidrun() { System.out.println(index); } }); }

執行緒池為無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的執行緒,而不用每次新建執行緒。

(2). newFixedThreadPool
建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。示例程式碼如下:

Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for(inti =0; i <10; i++) { finalintindex = i; fixedThreadPool.execute(newRunnable() { @Override publicvoidrun() { try{ System.out.println(index); Thread.sleep(2000); }catch(InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); }

  

因為執行緒池大小為3,每個任務輸出index後sleep 2秒,所以每兩秒列印3個數字。

定長執行緒池的大小最好根據系統資源進行設定。如Runtime.getRuntime().availableProcessors()。可參考PreloadDataCache

(3) newScheduledThreadPool
建立一個定長執行緒池,支援定時及週期性任務執行。延遲執行示例程式碼如下:

Java
1 2 3 4 5 6 7 8 ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.schedule(newRunnable() { @Override publicvoidrun() { System.out.println("delay 3 seconds"); } },3, TimeUnit.SECONDS);

表示延遲3秒執行。

定期執行示例程式碼如下:

Java
1 2 3 4 5 6 7 scheduledThreadPool.scheduleAtFixedRate(newRunnable() { @Override publicvoidrun() { System.out.println("delay 1 seconds, and excute every 3 seconds"); } },1,3, TimeUnit.SECONDS);

表示延遲1秒後每3秒執行一次。

ScheduledExecutorService比Timer更安全,功能更強大,後面會有一篇單獨進行對比。

(4)、newSingleThreadExecutor
建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。示例程式碼如下:

Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); for(inti =0; i <10; i++) { finalintindex = i; singleThreadExecutor.execute(newRunnable() { @Override publicvoidrun() { try{ System.out.println(index); Thread.sleep(2000); }catch(InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); }

結果依次輸出,相當於順序執行各個任務。

現行大多數GUI程式都是單執行緒的。Android中單執行緒可用於資料庫操作,檔案操作,應用批量安裝,應用批量刪除等不適合併發但可能IO阻塞性及影響UI執行緒響應的操作。

特別的執行緒池:ScheduledThreadPoolExecutor

原文出處http://cmsblogs.com/chenssy

在上篇部落格【死磕Java併發】-----J.U.C之執行緒池:ThreadPoolExecutor已經介紹了執行緒池中最核心的類ThreadPoolExecutor,這篇就來看看另一個核心類ScheduledThreadPoolExecutor的實現。

ScheduledThreadPoolExecutor解析

我們知道Timer與TimerTask雖然可以實現執行緒的週期和延遲排程,但是Timer與TimerTask存在一些缺陷,所以對於這種定期、週期執行任務的排程策略,我們一般都是推薦ScheduledThreadPoolExecutor來實現。下面就深入分析ScheduledThreadPoolExecutor是如何來實現執行緒的週期、延遲排程的。

ScheduledThreadPoolExecutor,繼承ThreadPoolExecutor且實現了ScheduledExecutorService介面,它就相當於提供了“延遲”和“週期執行”功能的ThreadPoolExecutor。在JDK API中是這樣定義它的:ThreadPoolExecutor,它可另行安排在給定的延遲後執行命令,或者定期執行命令。需要多個輔助執行緒時,或者要求 ThreadPoolExecutor 具有額外的靈活性或功能時,此類要優於 Timer。 一旦啟用已延遲的任務就執行它,但是有關何時啟用,啟用後何時執行則沒有任何實時保證。按照提交的先進先出 (FIFO) 順序來啟用那些被安排在同一執行時間的任務。

它提供了四個構造方法:

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  3. new DelayedWorkQueue());
  4. }
  5. public ScheduledThreadPoolExecutor(int corePoolSize,
  6. ThreadFactory threadFactory) {
  7. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  8. new DelayedWorkQueue(), threadFactory);
  9. }
  10. public ScheduledThreadPoolExecutor(int corePoolSize,
  11. RejectedExecutionHandler handler) {
  12. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  13. new DelayedWorkQueue(), handler);
  14. }
  15. public ScheduledThreadPoolExecutor(int corePoolSize,
  16. ThreadFactory threadFactory,
  17. RejectedExecutionHandler handler) {
  18. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  19. new DelayedWorkQueue(), threadFactory, handler);
  20. }

當然我們一般都不會直接通過其建構函式來生成一個ScheduledThreadPoolExecutor物件(例如new ScheduledThreadPoolExecutor(10)之類的),而是通過Executors類(例如Executors.newScheduledThreadPool(int);)

在ScheduledThreadPoolExecutor的建構函式中,我們發現它都是利用ThreadLocalExecutor來構造的,唯一變動的地方就在於它所使用的阻塞佇列變成了DelayedWorkQueue,而不是ThreadLocalhExecutor的LinkedBlockingQueue(通過Executors產生ThreadLocalhExecutor物件)。DelayedWorkQueue為ScheduledThreadPoolExecutor中的內部類,它其實和阻塞佇列DelayQueue有點兒類似。DelayQueue是可以提供延遲的阻塞佇列,它只有在延遲期滿時才能從中提取元素,其列頭是延遲期滿後儲存時間最長的Delayed元素。如果延遲都還沒有期滿,則佇列沒有頭部,並且 poll 將返回 null。有關於DelayQueue的更多介紹請參考這篇部落格【死磕Java併發】-----J.U.C之阻塞佇列:DelayQueue。所以DelayedWorkQueue中的任務必然是按照延遲時間從短到長來進行排序的。下面我們再深入分析DelayedWorkQueue,這裡留一個引子。

ScheduledThreadPoolExecutor提供瞭如下四個方法,也就是四個排程器:

  1. schedule(Callable callable, long delay, TimeUnit unit) :建立並執行在給定延遲後啟用的 ScheduledFuture。
  2. schedule(Runnable command, long delay, TimeUnit unit) :建立並執行在給定延遲後啟用的一次性操作。
  3. scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :建立並執行一個在給定初始延遲後首次啟用的定期操作,後續操作具有給定的週期;也就是將在 initialDelay 後開始執行,然後在 initialDelay+period 後執行,接著在 initialDelay + 2 * period 後執行,依此類推。
  4. scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) :建立並執行一個在給定初始延遲後首次啟用的定期操作,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。

第一、二個方法差不多,都是一次性操作,只不過引數一個是Callable,一個是Runnable。稍微分析下第三(scheduleAtFixedRate)、四個(scheduleWithFixedDelay)方法,加入initialDelay = 5,period/delay = 3,unit為秒。如果每個執行緒都是都執行非常良好不存在延遲的問題,那麼這兩個方法執行緒執行週期是5、8、11、14、17.......,但是如果存在延遲呢?比如第三個執行緒用了5秒鐘,那麼這兩個方法的處理策略是怎樣的?第三個方法(scheduleAtFixedRate)是週期固定,也就說它是不會受到這個延遲的影響的,每個執行緒的排程週期在初始化時就已經絕對了,是什麼時候排程就是什麼時候排程,它不會因為上一個執行緒的排程失效延遲而受到影響。但是第四個方法(scheduleWithFixedDelay),則不一樣,它是每個執行緒的排程間隔固定,也就是說第一個執行緒與第二執行緒之間間隔delay,第二個與第三個間隔delay,以此類推。如果第二執行緒推遲了那麼後面所有的執行緒排程都會推遲,例如,上面第二執行緒推遲了2秒,那麼第三個就不再是11秒執行了,而是13秒執行。

檢視著四個方法的原始碼,會發現其實他們的處理邏輯都差不多,所以我們就挑scheduleWithFixedDelay方法來分析,如下:

  1. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
  2. long initialDelay,
  3. long delay,
  4. TimeUnit unit) {
  5. if (command == null || unit == null)
  6. throw new NullPointerException();
  7. if (delay <= 0)
  8. throw new IllegalArgumentException();
  9. ScheduledFutureTask<Void> sft =
  10. new ScheduledFutureTask<Void>(command,
  11. null,
  12. triggerTime(initialDelay, unit),
  13. unit.toNanos(-delay));
  14. RunnableScheduledFuture<Void> t = decorateTask(command, sft);
  15. sft.outerTask = t;
  16. delayedExecute(t);
  17. return t;
  18. }

scheduleWithFixedDelay方法處理的邏輯如下:

  1. 校驗,如果引數不合法則丟擲異常
  2. 構造一個task,該task為ScheduledFutureTask
  3. 呼叫delayedExecute()方法做後續相關處理

這段程式碼涉及兩個類ScheduledFutureTask和RunnableScheduledFuture,其中RunnableScheduledFuture不用多說,他繼承RunnableFuture和ScheduledFuture兩個介面,除了具備RunnableFuture和ScheduledFuture兩類特性外,它還定義了一個方法isPeriodic() ,該方法用於判斷執行的任務是否為定期任務,如果是則返回true。而ScheduledFutureTask作為ScheduledThreadPoolExecutor的內部類,它扮演著極其重要的作用,因為它的作用則是負責ScheduledThreadPoolExecutor中任務的排程。

ScheduledFutureTask內部繼承FutureTask,實現RunnableScheduledFuture介面,它內部定義了三個比較重要的變數

  1. /** 任務被新增到ScheduledThreadPoolExecutor中的序號 */
  2. private final long sequenceNumber;
  3. /** 任務要執行的具體時間 */
  4. private long time;
  5. /** 任務的間隔週期 /
  6. private final long period;

這三個變數與任務的執行有著非常密切的關係,什麼關係?先看ScheduledFutureTask的幾個建構函式和核心方法:

  1. ScheduledFutureTask(Runnable r, V result, long ns) {
  2. super(r, result);
  3. this.time = ns;
  4. this.period = 0;
  5. this.sequenceNumber = sequencer.getAndIncrement();
  6. }
  7. ScheduledFutureTask(Runnable r, V result, long ns, long period) {
  8. super(r, result);
  9. this.time = ns;
  10. this.period = period;
  11. this.sequenceNumber = sequencer.getAndIncrement();
  12. }
  13. ScheduledFutureTask(Callable<V> callable, long ns) {
  14. super(callable);
  15. this.time = ns;
  16. this.period = 0;
  17. this.sequenceNumber = sequencer.getAndIncrement();
  18. }
  19. ScheduledFutureTask(Callable<V> callable, long ns) {
  20. super(callable);
  21. this.time = ns;
  22. this.period = 0;
  23. this.sequenceNumber = sequencer.getAndIncrement();
  24. }

ScheduledFutureTask 提供了四個構造方法,這些構造方法與上面三個引數是不是一一對應了?這些引數有什麼用,如何用,則要看ScheduledFutureTask在那些方法使用了該方法,在ScheduledFutureTask中有一個compareTo()方法:

  1. public int compareTo(Delayed other) {
  2. if (other == this) // compare zero if same object
  3. return 0;
  4. if (other instanceof ScheduledFutureTask) {
  5. ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
  6. long diff = time - x.time;
  7. if (diff < 0)
  8. return -1;
  9. else if (diff > 0)
  10. return 1;
  11. else if (sequenceNumber < x.sequenceNumber)
  12. return -1;
  13. else
  14. return 1;
  15. }
  16. long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
  17. return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
  18. }

相信各位都知道該方法是幹嘛用的,提供一個排序演算法,該演算法規則是:首先按照time排序,time小的排在前面,大的排在後面,如果time相同,則使用sequenceNumber排序,小的排在前面,大的排在後面。那麼為什麼在這個類裡面提供compareTo()方法呢?在前面就介紹過ScheduledThreadPoolExecutor在構造方法中提供的是DelayedWorkQueue()佇列中,也就是說ScheduledThreadPoolExecutor是把任務新增到DelayedWorkQueue中的,而DelayedWorkQueue則是類似於DelayQueue,內部維護著一個以時間為先後順序的佇列,所以compareTo()方法使用與DelayedWorkQueue佇列對其元素ScheduledThreadPoolExecutor task進行排序的演算法。

排序已經解決了,那麼ScheduledThreadPoolExecutor 是如何對task任務進行排程和延遲的呢?任何執行緒的執行,都是通過run()方法執行,ScheduledThreadPoolExecutor 的run()方法如下:

  1. public void run() {
  2. boolean periodic = isPeriodic();
  3. if (!canRunInCurrentRunState(periodic))
  4. cancel(false);
  5. else if (!periodic)
  6. ScheduledFutureTask.super.run();
  7. else if (ScheduledFutureTask.super.runAndReset()) {
  8. setNextRunTime();
  9. reExecutePeriodic(outerTask);
  10. }
  11. }
  1. 呼叫isPeriodic()獲取該執行緒是否為週期性任務標誌,然後呼叫canRunInCurrentRunState()方法判斷該執行緒是否可以執行,如果不可以執行則呼叫cancel()取消任務。
  2. 如果當執行緒已經到達了執行點,則呼叫run()方法執行task,該run()方法是在FutureTask中定義的。
  3. 否則呼叫runAndReset()方法執行並充值,呼叫setNextRunTime()方法計算任務下次的執行時間,重新把任務新增到佇列中,讓該任務可以重複執行。

isPeriodic()

該方法用於判斷指定的任務是否為定期任務。

  1. public boolean isPeriodic() {
  2. return period != 0;
  3. }

canRunInCurrentRunState()判斷任務是否可以取消,cancel()取消任務,這兩個方法比較簡單,而run()執行任務,runAndReset()執行並重置狀態,牽涉比較廣,我們放在FutureTask後面介紹。所以重點介紹setNextRunTime()和reExecutePeriodic()這兩個涉及到延遲的方法。

setNextRunTime()

setNextRunTime()方法用於重新計算任務的下次執行時間。如下:

  1. private void setNextRunTime() {
  2. long p = period;
  3. if (p > 0)
  4. time += p;
  5. else
  6. time = triggerTime(-p);
  7. }

該方法定義很簡單,p > 0 ,time += p ,否則呼叫triggerTime()方法重新計算time:

  1. long triggerTime(long delay) {
  2. return now() +
  3. ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
  4. }

reExecutePeriodic

  1. void reExecutePeriodic(RunnableScheduledFuture<?> task) {
  2. if (canRunInCurrentRunState(true)) {
  3. super.getQueue().add(task);
  4. if (!canRunInCurrentRunState(true) && remove(task))
  5. task.cancel(false);
  6. else
  7. ensurePrestart();
  8. }
  9. }

reExecutePeriodic重要的是呼叫super.getQueue().add(task);將任務task加入的佇列DelayedWorkQueue中。ensurePrestart()在【死磕Java併發】-----J.U.C之執行緒池:ThreadPoolExecutor已經做了詳細介紹。

到這裡ScheduledFutureTask已經介紹完了,ScheduledFutureTask在ScheduledThreadPoolExecutor扮演作用的重要性不言而喻。其實ScheduledThreadPoolExecutor的實現不是很複雜,因為有FutureTask和ThreadPoolExecutor的支撐,其實現就顯得不是那麼難了。

總結

我一向不喜歡寫總結,因為我把所有需要表達的都寫在正文中了,寫小篇幅的總結並不能真正將話說清楚,本文的總結部分為準備面試的讀者而寫,希望能幫到面試者或者沒有足夠的時間看完全文的讀者。

  1. java 執行緒池有哪些關鍵屬性?

    corePoolSize,maximumPoolSize,workQueue,keepAliveTime,rejectedExecutionHandler

    corePoolSize 到 maximumPoolSize 之間的執行緒會被回收,當然 corePoolSize 的執行緒也可以通過設定而得到回收(allowCoreThreadTimeOut(true))。

    workQueue 用於存放任務,新增任務的時候,如果當前執行緒數超過了 corePoolSize,那麼往該佇列中插入任務,執行緒池中的執行緒會負責到佇列中拉取任務。

    keepAliveTime 用於設定空閒時間,如果執行緒數超出了 corePoolSize,並且有些執行緒的空閒時間超過了這個值,會執行關閉這些執行緒的操作

    rejectedExecutionHandler 用於處理當執行緒池不能執行此任務時的情況,預設有丟擲 RejectedExecutionException 異常、忽略任務、使用提交任務的執行緒來執行此任務和將佇列中等待最久的任務刪除,然後提交此任務這四種策略,預設為丟擲異常。

  2. 說說執行緒池中的執行緒建立時機?

    1. 如果當前執行緒數少於 corePoolSize,那麼提交任務的時候建立一個新的執行緒,並由這個執行緒執行這個任務;
    2. 如果當前執行緒數已經達到 corePoolSize,那麼將提交的任務新增到佇列中,等待執行緒池中的執行緒去佇列中取任務;
    3. 如果佇列已滿,那麼建立新的執行緒來執行任務,需要保證池中的執行緒數不會超過 maximumPoolSize,如果此時執行緒數超過了 maximumPoolSize,那麼執行拒絕策略。

    * 注意:如果將佇列設定為無界佇列,那麼執行緒數達到 corePoolSize 後,其實執行緒數就不會再增長了。

  3. Executors.newFixedThreadPool(…) 和 Executors.newCachedThreadPool() 構造出來的執行緒池有什麼差別?

    細說太長,往上滑一點點,在 Executors 的小節進行了詳盡的描述。

  4. 任務執行過程中發生異常怎麼處理?

    如果某個任務執行出現異常,那麼執行任務的執行緒會被關閉,而不是繼續接收其他任務。然後會啟動一個新的執行緒來代替它。

  5. 什麼時候會執行拒絕策略?

    1. workers 的數量達到了 corePoolSize(任務此時需要進入任務佇列),任務入隊成功,與此同時執行緒池被關閉了,而且關閉執行緒池並沒有將這個任務出隊,那麼執行拒絕策略。這裡說的是非常邊界的問題,入隊和關閉執行緒池併發執行,讀者仔細看看 execute 方法是怎麼進到第一個 reject(command) 裡面的。
    2. workers 的數量大於等於 corePoolSize,將任務加入到任務佇列,可是佇列滿了,任務入隊失敗,那麼準備開啟新的執行緒,可是執行緒數已經達到 maximumPoolSize,那麼執行拒絕策略。