執行緒池原理分析(一)-執行緒池體系結構
概述
隨著摩爾定律失效,多核計算器成為主流,多執行緒提高執行效率就變得異常重要,而執行緒的建立銷燬又是一個開銷比較大的操作,於是就產生了執行緒池,把使用過的執行緒放入執行緒池中,重複利用,其思想就是這些,很簡單,但是執行緒池的管理就沒有那麼簡單了,首先要管理好多個執行緒,然後還要管理任務,所以整個事情就變得複雜起來,本篇先介紹一下執行緒池的繼承結構圖,簡單介紹下各個介面和類的作用,下一篇文章再分析執行緒池。
執行緒池繼承結構圖
簡化結構圖如下
上面第一幅圖把執行緒池的關鍵的類都畫出來了,第二幅圖是一個簡化的圖,這個裡面的實現才是核心,第一幅圖先不要管,先介紹第二幅圖,當第二幅圖介紹完之後,再回過頭來看第一幅圖。
Executor
public interface Executor { void execute(Runnable command); }
執行緒池是用來管理執行緒的,而執行緒是用來執行任務的,所以在最頂層設計了Executor介面,該介面就一個方法,用來執行任務。
ExecutorService
public interface ExecutorService extends Executor { //關閉執行緒池,但阻塞佇列中的任務繼續執行完之後才關閉 void shutdown(); //關閉執行緒池,阻塞佇列中的任務直接丟棄 List<Runnable> shutdownNow(); //是否處於SHUTDOWN狀態 boolean isShutdown(); //是否處於TERMINATED狀態 boolean isTerminated(); //等待執行緒池進入TERMINATED狀態 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; //提交任務,不過任務有返回值 <T> Future<T> submit(Callable<T> task); //提交任務,不過任務有返回值 <T> Future<T> submit(Runnable task, T result); //提交Runnable任務,會自動封裝成Callable任務 Future<?> submit(Runnable task); /** * 執行給定的任務,當所有任務完成時,返回保持任務狀態和結果的 Future 列表 */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; /** * 執行給定的任務,當所有任務完成或超時期滿時(無論哪個首先發生),返回保持任務狀態和結果的 Future 列表 */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; /** * 執行給定的任務,如果某個任務已成功完成(也就是未丟擲異常),則返回其結果 */ <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; /** * 執行給定的任務,如果在給定的超時期滿前某個任務已成功完成(也就是未丟擲異常),則返回其結果 */ <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
這個介面主要作用就是提交任務,讓介面Executor中的execute方法執行,開頭的幾個方法shutdow(),shutdownNow()等,這些都是讓執行緒池結束的方法,但是執行緒池有好幾個狀態,而執行緒池結束就是在這些狀態之間轉換,具體到介紹ThreadPoolExecutor類的時候在介紹。
AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService { //程式碼就不貼了 }
這個類是一個抽象類,實現了ExecutorService,主要是為該介面的方法提供一些預設的實現。
ThreadPoolExecutor
這個類就是執行緒池實現的核心類,執行緒池的執行緒管理和阻塞佇列管理都是它完成的,下一篇文章會詳細介紹這個類。
ScheduledExecutorService
public interface ScheduledExecutorService extends ExecutorService { //執行一次性任務,固定延遲之後開始執行,有返回結果 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); //一次性任務,有返回 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); //固定延遲之後開始執行,之後按照固定時間間隔執行,如果在間隔時間內上一次排程還沒有執行完 //不會影響下一次執行 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); //固定延遲之後開始執行,之後按照固定時間間隔執行,如果在間隔時間內上一次排程還沒有執行完 //下一次排程任務即便時間已經到了也不會執行,會等到上一次任務執行完之後在等待固定時間間隔之後執行 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
在ExecutorService介面中定義了很多的提交任務的方法,但是那些提交任務的方法都沒有設定延遲執行的,這個介面定義了4個固定延時的方法,其中最後兩個還可以定時執行。
ScheduledThreadPoolExecutor
java.util.concurrent.ScheduledThreadPoolExecutor
,繼承 ThreadPoolExecutor ,並且實現 ScheduledExecutorService 介面,是兩者的集大成者,相當於提供了“延遲”和“週期執行”功能的 ThreadPoolExecutor 。
這裡簡單提一下其設計思想,大家都知道執行緒池中有一個佇列用於放任務,如果這個佇列是一個特殊的佇列,比如DelayedQueue(ScheduledThreadPoolExecutor中使用的佇列和這個類似),這個佇列可以控制佇列中的節點延遲時間,如果佇列中的節點沒有到期,通過take等方法就無法獲取到值,會阻塞,直到延遲時間結束,take才可以獲取到,這樣就可以做到定時執行。到這裡可能有的胖友有疑問,如果從佇列中取出來,那佇列中不就沒有這個任務了,那下次再定時執行怎麼辦,這個就是ScheduledFutureTask(這個類上面沒有介紹,看名字大家應該可以猜到,和FutureTask作用差不多)的事情了,該類會重寫run方法,在run方法中會再次將任務放入到佇列中。
具體可以參考:【死磕Java併發】—–J.U.C之執行緒池:ScheduledThreadPoolExecutor
小結
以上就是第二幅圖相關介面和類的介紹,其中執行緒池和定時排程類沒有詳細介紹,只是大致提了一下,因為這兩個類很複雜,後面專門寫文章介紹。下面就看一下第一幅圖多出來的部分的內容。
Executors
靜態工廠類,提供了 Executor、ExecutorService 、ScheduledExecutorService、ThreadFactory 、Callable 等類的靜態工廠方法,通過這些工廠方法我們可以得到相對應的物件。舉例:
ExecutorService service = Executors.newFixedThreadPool(10);
Future
public interface Future<V> { //取消當前任務 boolean cancel(boolean mayInterruptIfRunning); //是否已經取消 boolean isCancelled(); //是否執行完成 boolean isDone(); //獲取執行完成的結果,如果還沒有執行完成就阻塞 V get() throws InterruptedException, ExecutionException; //獲取執行完成的結果,等待固定的時間,如果規定時間內還沒有執行完,拋異常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
獲取任務執行狀態的一個類,當任務提交到執行緒池,需要獲取任務執行的結果,如果執行緒池中堆積了很多的任務,那很多工都無法及時的獲取執行結果,這個時候就看使用者的個人選擇了,是使用get等待,還是直接取消該任務。
RunnableFuture
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }|
該介面繼承了Runnable,那實現這個介面的類就可以直接作為任務傳入執行緒池執行,而該介面同時又實現了Future介面,這就說明還可以去獲取任務的執行狀態。
FutureTask
public class FutureTask<V> implements RunnableFuture<V> { private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; //省略部分程式碼 }
這個類就實現了RunnableFuture,該類是一個核心類,如果要獲取執行緒的執行結果,就是通過這個類獲取的,該類的實現和AQS很類似,裡面也是搞了一個阻塞佇列,然後搞了一個state來管理狀態。其實這個阻塞佇列就是一個單鏈表,裡面放的是要獲取該任務執行結果的執行緒,如果該任務還沒有執行完,那這些執行緒就無法獲取到結果,就會放入到了連結串列中,state的作用就是存放任務的狀態,上面貼出的程式碼中可以看出任務可以處於以上7種狀態,多個執行緒都可以操作state,所以要通過CAS來加鎖修改,其實思想和AQS很類似。後面會詳細分析。
具體大家可以參考:FutureTask原始碼解析(2)——深入理解FutureTask
CompletionService
public interface CompletionService<V> { //提交任務 Future<V> submit(Callable<V> task); //提交任務 Future<V> submit(Runnable task, V result); //獲取任務執行結果 Future<V> take() throws InterruptedException; //獲取任務執行結果 Future<V> poll(); //獲取任務執行結果 Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }
這個介面主要作用其實就是為了彌補Future的不足,通過Future的get方法可以獲取執行結果,這個確實沒問題,思考一下如下場景,如果提交了很多的任務,然後就等待獲取這些任務的執行結果,但是又不知道哪個任務先執行完,如果從頭開始遍歷,如果第一個任務一下子執行了2個小時,那就阻塞了2小時,但是在這個兩小時中可能已經有很多別的任務執行完了,我們本可以先去處理那些已經執行完的任務,但是卻白白等了2小時,當然了,對於有經驗的胖友來說,可以遍歷然後呼叫isDone()方法,沒有完成就過,然後一直死迴圈,直到所有的執行緒執行完,這樣子也行,只是不夠優雅。
大家如果仔細看上面介面的定義,會發現很多的方法好像是操作佇列的方法,其實像take,poll就是操作佇列的方法,具體的實現是這樣的,CompletionService的實現類會在類中定義一個阻塞佇列,當有任務執行完了,任務的執行結果就放入佇列中,然後我們只需要呼叫take方法獲取就可以了,當佇列中為空的時候,就阻塞等待,這樣實現起來就很優雅。
具體大家可以參考:CompletionService和ExecutorCompletionService詳解
ExecutorCompletionService
public class ExecutorCompletionService<V> implements CompletionService<V> { private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; //省略部分程式碼 }
這個類就是上面那個介面的實現類,裡面有如下程式碼
private final BlockingQueue<Future<V>> completionQueue;
這個阻塞佇列就是放執行完成結果的。具體的思想在上面介紹介面的時候已經介紹了,這裡就不在贅述了。
Delayed
public interface Delayed extends Comparable<Delayed> { //獲取延遲時間 long getDelay(TimeUnit unit); }
該介面定義很簡單,就只有一個方法。
ScheduledFuture
public interface ScheduledFuture<V> extends Delayed, Future<V> { }
這個介面從名稱就可以看出來,就是實現延時執行,並且還要獲取執行結果。
總結
上面籠統的把每個類和介面給簡單介紹了一下,下面梳理一下。其實和執行緒池相關的類中有四個類是很核心的類,分別如下:
-
ThreadPoolExecutor:這個就不用多說了,實現執行緒池的,核心類
- FutureTask:如果不用獲取定時任務執行結果,直接使用Runnable提交任務就可以了,如果要獲取執行結果就需要使用FutureTask提交任務
-
ExecutorCompletionService:該類組合了上面兩個類,執行緒池的作用是用來執行任務,而執行的任務就是FutureTask,然後該類還使用了一個阻塞佇列把已經執行完成的任務放入到佇列,方便獲取
-
ScheduledThreadPoolExecutor:該類是實現定時排程的核心類,其執行也是需要依賴執行緒池
從上面的介紹中大家可以看出,最後兩個類都要依賴於執行緒池,所以ThreadPoolExecutor才是最核心的,下一篇文章就介紹這個類。上面的介紹有點亂,只是想把執行緒池相關的重要的類都給梳理下,有個整體印象,之後在學習的時候知道自己處在什麼位置,還有哪些沒有學習到。
參考:
【死磕 Java 併發】—– J.U.C 之執行緒池:執行緒池的基礎架構
【死磕Java併發】—–J.U.C之執行緒池:執行緒池的基礎架構
jdk1.8原始碼