1. 程式人生 > 實用技巧 >執行緒池原理分析(一)-執行緒池體系結構

執行緒池原理分析(一)-執行緒池體系結構

概述

  隨著摩爾定律失效,多核計算器成為主流,多執行緒提高執行效率就變得異常重要,而執行緒的建立銷燬又是一個開銷比較大的操作,於是就產生了執行緒池,把使用過的執行緒放入執行緒池中,重複利用,其思想就是這些,很簡單,但是執行緒池的管理就沒有那麼簡單了,首先要管理好多個執行緒,然後還要管理任務,所以整個事情就變得複雜起來,本篇先介紹一下執行緒池的繼承結構圖,簡單介紹下各個介面和類的作用,下一篇文章再分析執行緒池。

執行緒池繼承結構圖

簡化結構圖如下

上面第一幅圖把執行緒池的關鍵的類都畫出來了,第二幅圖是一個簡化的圖,這個裡面的實現才是核心,第一幅圖先不要管,先介紹第二幅圖,當第二幅圖介紹完之後,再回過頭來看第一幅圖。

Executor

public interface Executor {
     void execute(Runnable command);
}

  執行緒池是用來管理執行緒的,而執行緒是用來執行任務的,所以在最頂層設計了Executor介面,該介面就一個方法,用來執行任務。

ExecutorService

public interface ExecutorService extends Executor {
    
    //關閉執行緒池,但阻塞佇列中的任務繼續執行完之後才關閉
    void shutdown();
    //關閉執行緒池,阻塞佇列中的任務直接丟棄
    List<Runnable> shutdownNow();
    //是否處於SHUTDOWN狀態
    boolean isShutdown();
    //是否處於TERMINATED狀態
    boolean isTerminated();
    //等待執行緒池進入TERMINATED狀態
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    //提交任務,不過任務有返回值
    <T> Future<T> submit(Callable<T> task);
    //提交任務,不過任務有返回值
    <T> Future<T> submit(Runnable task, T result);
    //提交Runnable任務,會自動封裝成Callable任務
    Future<?> submit(Runnable task);
     /**
     * 執行給定的任務,當所有任務完成時,返回保持任務狀態和結果的 Future 列表
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
     /**
     * 執行給定的任務,當所有任務完成或超時期滿時(無論哪個首先發生),返回保持任務狀態和結果的 Future 列表
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
     /**
     * 執行給定的任務,如果某個任務已成功完成(也就是未丟擲異常),則返回其結果
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
     /**
     * 執行給定的任務,如果在給定的超時期滿前某個任務已成功完成(也就是未丟擲異常),則返回其結果
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
  

}    

  這個介面主要作用就是提交任務,讓介面Executor中的execute方法執行,開頭的幾個方法shutdow(),shutdownNow()等,這些都是讓執行緒池結束的方法,但是執行緒池有好幾個狀態,而執行緒池結束就是在這些狀態之間轉換,具體到介紹ThreadPoolExecutor類的時候在介紹。

AbstractExecutorService

public abstract class AbstractExecutorService implements ExecutorService {

     //程式碼就不貼了
}

這個類是一個抽象類,實現了ExecutorService,主要是為該介面的方法提供一些預設的實現。

ThreadPoolExecutor

這個類就是執行緒池實現的核心類,執行緒池的執行緒管理和阻塞佇列管理都是它完成的,下一篇文章會詳細介紹這個類。

ScheduledExecutorService

public interface ScheduledExecutorService extends ExecutorService {
    //執行一次性任務,固定延遲之後開始執行,有返回結果
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
    //一次性任務,有返回
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
    //固定延遲之後開始執行,之後按照固定時間間隔執行,如果在間隔時間內上一次排程還沒有執行完
   //不會影響下一次執行
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit); 
    //固定延遲之後開始執行,之後按照固定時間間隔執行,如果在間隔時間內上一次排程還沒有執行完
   //下一次排程任務即便時間已經到了也不會執行,會等到上一次任務執行完之後在等待固定時間間隔之後執行
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

在ExecutorService介面中定義了很多的提交任務的方法,但是那些提交任務的方法都沒有設定延遲執行的,這個介面定義了4個固定延時的方法,其中最後兩個還可以定時執行。

ScheduledThreadPoolExecutor

  java.util.concurrent.ScheduledThreadPoolExecutor,繼承 ThreadPoolExecutor ,並且實現 ScheduledExecutorService 介面,是兩者的集大成者,相當於提供了“延遲”和“週期執行”功能的 ThreadPoolExecutor 。

  這裡簡單提一下其設計思想,大家都知道執行緒池中有一個佇列用於放任務,如果這個佇列是一個特殊的佇列,比如DelayedQueue(ScheduledThreadPoolExecutor中使用的佇列和這個類似),這個佇列可以控制佇列中的節點延遲時間,如果佇列中的節點沒有到期,通過take等方法就無法獲取到值,會阻塞,直到延遲時間結束,take才可以獲取到,這樣就可以做到定時執行。到這裡可能有的胖友有疑問,如果從佇列中取出來,那佇列中不就沒有這個任務了,那下次再定時執行怎麼辦,這個就是ScheduledFutureTask(這個類上面沒有介紹,看名字大家應該可以猜到,和FutureTask作用差不多)的事情了,該類會重寫run方法,在run方法中會再次將任務放入到佇列中。

具體可以參考:【死磕Java併發】—–J.U.C之執行緒池:ScheduledThreadPoolExecutor

小結

  以上就是第二幅圖相關介面和類的介紹,其中執行緒池和定時排程類沒有詳細介紹,只是大致提了一下,因為這兩個類很複雜,後面專門寫文章介紹。下面就看一下第一幅圖多出來的部分的內容。

Executors

靜態工廠類,提供了 Executor、ExecutorService 、ScheduledExecutorService、ThreadFactory 、Callable 等類的靜態工廠方法,通過這些工廠方法我們可以得到相對應的物件。舉例:

ExecutorService service = Executors.newFixedThreadPool(10);

Future

public interface Future<V> {
    //取消當前任務
    boolean cancel(boolean mayInterruptIfRunning);
    //是否已經取消
    boolean isCancelled();
    //是否執行完成
    boolean isDone();
    //獲取執行完成的結果,如果還沒有執行完成就阻塞
    V get() throws InterruptedException, ExecutionException;
    //獲取執行完成的結果,等待固定的時間,如果規定時間內還沒有執行完,拋異常
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

  獲取任務執行狀態的一個類,當任務提交到執行緒池,需要獲取任務執行的結果,如果執行緒池中堆積了很多的任務,那很多工都無法及時的獲取執行結果,這個時候就看使用者的個人選擇了,是使用get等待,還是直接取消該任務。

RunnableFuture

public interface RunnableFuture<V> extends Runnable, Future<V> {

    void run();
}|

  該介面繼承了Runnable,那實現這個介面的類就可以直接作為任務傳入執行緒池執行,而該介面同時又實現了Future介面,這就說明還可以去獲取任務的執行狀態。

FutureTask

public class FutureTask<V> implements RunnableFuture<V> {

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

   //省略部分程式碼

}

  這個類就實現了RunnableFuture,該類是一個核心類,如果要獲取執行緒的執行結果,就是通過這個類獲取的,該類的實現和AQS很類似,裡面也是搞了一個阻塞佇列,然後搞了一個state來管理狀態。其實這個阻塞佇列就是一個單鏈表,裡面放的是要獲取該任務執行結果的執行緒,如果該任務還沒有執行完,那這些執行緒就無法獲取到結果,就會放入到了連結串列中,state的作用就是存放任務的狀態,上面貼出的程式碼中可以看出任務可以處於以上7種狀態,多個執行緒都可以操作state,所以要通過CAS來加鎖修改,其實思想和AQS很類似。後面會詳細分析。

具體大家可以參考:FutureTask原始碼解析(2)——深入理解FutureTask

CompletionService

public interface CompletionService<V> {
    //提交任務
    Future<V> submit(Callable<V> task);
    //提交任務
    Future<V> submit(Runnable task, V result);
    //獲取任務執行結果
    Future<V> take() throws InterruptedException;
    //獲取任務執行結果
    Future<V> poll(); 
    //獲取任務執行結果
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;

}

  這個介面主要作用其實就是為了彌補Future的不足,通過Future的get方法可以獲取執行結果,這個確實沒問題,思考一下如下場景,如果提交了很多的任務,然後就等待獲取這些任務的執行結果,但是又不知道哪個任務先執行完,如果從頭開始遍歷,如果第一個任務一下子執行了2個小時,那就阻塞了2小時,但是在這個兩小時中可能已經有很多別的任務執行完了,我們本可以先去處理那些已經執行完的任務,但是卻白白等了2小時,當然了,對於有經驗的胖友來說,可以遍歷然後呼叫isDone()方法,沒有完成就過,然後一直死迴圈,直到所有的執行緒執行完,這樣子也行,只是不夠優雅。

  大家如果仔細看上面介面的定義,會發現很多的方法好像是操作佇列的方法,其實像take,poll就是操作佇列的方法,具體的實現是這樣的,CompletionService的實現類會在類中定義一個阻塞佇列,當有任務執行完了,任務的執行結果就放入佇列中,然後我們只需要呼叫take方法獲取就可以了,當佇列中為空的時候,就阻塞等待,這樣實現起來就很優雅。

具體大家可以參考:CompletionService和ExecutorCompletionService詳解

ExecutorCompletionService

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
//省略部分程式碼
}

  這個類就是上面那個介面的實現類,裡面有如下程式碼

 private final BlockingQueue<Future<V>> completionQueue;

  這個阻塞佇列就是放執行完成結果的。具體的思想在上面介紹介面的時候已經介紹了,這裡就不在贅述了。

Delayed

public interface Delayed extends Comparable<Delayed> {
    //獲取延遲時間
    long getDelay(TimeUnit unit);
}

  該介面定義很簡單,就只有一個方法。

ScheduledFuture

public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

  這個介面從名稱就可以看出來,就是實現延時執行,並且還要獲取執行結果。

總結

上面籠統的把每個類和介面給簡單介紹了一下,下面梳理一下。其實和執行緒池相關的類中有四個類是很核心的類,分別如下:

  • ThreadPoolExecutor:這個就不用多說了,實現執行緒池的,核心類

  • FutureTask:如果不用獲取定時任務執行結果,直接使用Runnable提交任務就可以了,如果要獲取執行結果就需要使用FutureTask提交任務
  • ExecutorCompletionService:該類組合了上面兩個類,執行緒池的作用是用來執行任務,而執行的任務就是FutureTask,然後該類還使用了一個阻塞佇列把已經執行完成的任務放入到佇列,方便獲取

  • ScheduledThreadPoolExecutor:該類是實現定時排程的核心類,其執行也是需要依賴執行緒池

從上面的介紹中大家可以看出,最後兩個類都要依賴於執行緒池,所以ThreadPoolExecutor才是最核心的,下一篇文章就介紹這個類。上面的介紹有點亂,只是想把執行緒池相關的重要的類都給梳理下,有個整體印象,之後在學習的時候知道自己處在什麼位置,還有哪些沒有學習到。

參考:

【死磕 Java 併發】—– J.U.C 之執行緒池:執行緒池的基礎架構

【死磕Java併發】—–J.U.C之執行緒池:執行緒池的基礎架構

jdk1.8原始碼