1. 程式人生 > 程式設計 >深入學習java執行緒池

深入學習java執行緒池

我們都是通過new Thread來建立一個執行緒,由於執行緒的建立和銷燬都需要消耗一定的CPU資源,所以在高併發下這種建立執行緒的方式將嚴重影響程式碼執行效率。而執行緒池的作用就是讓一個執行緒執行結束後不馬上銷燬,繼續執行新的任務,這樣就節省了不斷建立執行緒和銷燬執行緒的開銷。

ThreadPoolExecutor

建立Java執行緒池最為核心的類為ThreadPoolExecutor:

QQ截圖20190630215357.png

它提供了四種建構函式來建立執行緒池,其中最為核心的建構函式如下所示:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
複製程式碼

這7個引數的含義如下:

  • corePoolSize 執行緒池核心執行緒數。即執行緒池中保留的執行緒個數,即使這些執行緒是空閒的,也不會被銷燬,除非通過ThreadPoolExecutor的allowCoreThreadTimeOut(true)方法開啟了核心執行緒的超時策略;

  • maximumPoolSize 執行緒池中允許的最大執行緒個數;

  • keepAliveTime 用於設定那些超出核心執行緒數量的執行緒的最大等待時間,超過這個時間還沒有新任務的話,超出的執行緒將被銷燬;

  • unit 超時時間單位;

  • workQueue 執行緒佇列。用於儲存通過execute方法提交的,等待被執行的任務;

  • threadFactory 執行緒建立工程,即指定怎樣建立執行緒;

  • handler 拒絕策略。即指定當執行緒提交的數量超出了maximumPoolSize後,該使用什麼策略處理超出的執行緒。

在通過這個構造方法建立執行緒池的時候,這幾個引數必須滿足以下條件,否則將丟擲IllegalArgumentException異常:

  • corePoolSize不能小於0;

  • keepAliveTime不能小於0;

  • maximumPoolSize 不能小於等於0;

  • maximumPoolSize不能小於corePoolSize;

此外,workQueue、threadFactory和handler不能為null,否則將丟擲空指標異常。

下面舉些例子來深入理解這幾個引數的含義。

使用上面的構造方法建立一個執行緒池:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1,2,10,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1),(ThreadFactory) Thread::new,new ThreadPoolExecutor.AbortPolicy());
System.out.println("執行緒池建立完畢");

int activeCount = -1;
int queueSize = -1;
while (true) {
    if (activeCount != threadPoolExecutor.getActiveCount()
            || queueSize != threadPoolExecutor.getQueue().size()) {
        System.out.println("活躍執行緒個數 " + threadPoolExecutor.getActiveCount());
        System.out.println("核心執行緒個數 " + threadPoolExecutor.getCorePoolSize());
        System.out.println("佇列執行緒個數 " + threadPoolExecutor.getQueue().size());
        System.out.println("最大執行緒數 " + threadPoolExecutor.getMaximumPoolSize());
        System.out.println("------------------------------------");
        activeCount = threadPoolExecutor.getActiveCount();
        queueSize = threadPoolExecutor.getQueue().size();
    }
}
複製程式碼

上面的程式碼建立了一個核心執行緒數量為1,允許最大執行緒數量為2,最大活躍時間為10秒,執行緒佇列長度為1的執行緒池。

假如我們通過execute方法向執行緒池提交1個任務,看看結果如何:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                1,new ThreadPoolExecutor.AbortPolicy());
System.out.println("執行緒池建立完畢");

threadPoolExecutor.execute(() -> sleep(100));

int activeCount = -1;
int queueSize = -1;
while (true) {
    if (activeCount != threadPoolExecutor.getActiveCount()
            || queueSize != threadPoolExecutor.getQueue().size()) {
        System.out.println("活躍執行緒個數 " + threadPoolExecutor.getActiveCount());
        System.out.println("核心執行緒個數 " + threadPoolExecutor.getCorePoolSize());
        System.out.println("佇列執行緒個數 " + threadPoolExecutor.getQueue().size());
        System.out.println("最大執行緒數 " + threadPoolExecutor.getMaximumPoolSize());
        System.out.println("------------------------------------");
        activeCount = threadPoolExecutor.getActiveCount();
        queueSize = threadPoolExecutor.getQueue().size();
    }
}
複製程式碼

ThreadPoolExecutor的execute和submit方法都可以向執行緒池提交任務,區別是,submit方法能夠返回執行結果,返回值型別為Future

sleep方法程式碼:

private static void sleep(long value) {
    try {
        System.out.println(Thread.currentThread().getName() + "執行緒執行sleep方法");
        TimeUnit.SECONDS.sleep(value);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
複製程式碼

啟動程式,控制檯輸出如下:

QQ截圖20190630222238.png

執行緒池核心執行緒數量為1,通過execute提交了一個任務後,由於核心執行緒是空閒的,所以任務被執行了。由於這個任務的邏輯是休眠100秒,所以在這100秒內,執行緒池的活躍執行緒數量為1。此外,因為提交的任務被核心執行緒執行了,所以並沒有執行緒需要被放到執行緒佇列裡等待,執行緒佇列長度為0。

假如我們通過execute方法向執行緒池提交2個任務,看看結果如何:

threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
複製程式碼

QQ截圖20190701183457.png

執行緒池核心執行緒數量為1,通過execute提交了2個任務後,一開始核心執行緒是空閒的,Thread-0被執行。由於這個任務的邏輯是休眠100秒,所以在這100秒內,執行緒池的活躍執行緒數量為1。因為核心執行緒數量為1,所以另外一個任務在這100秒內不能被執行,於是被放到執行緒佇列裡等待,執行緒佇列長度為1。

假如我們通過execute方法向執行緒池提交3個任務,看看結果如何:

threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
複製程式碼

QQ截圖20190701184303.png

這三個任務都是休眠100秒,所以核心執行緒池中第一個任務正在被執行,第二個任務被放入到了執行緒佇列。而當第三個任務被提交進來時,執行緒佇列滿了(我們定義的長度為1),由於該執行緒池允許的最大執行緒數量為2,所以執行緒池還可以再建立一個執行緒來執行另外一個任務,於是乎之前線上程佇列裡的執行緒被取出執行(FIFO),第三個任務被放入到了執行緒佇列。

改變第二個和第三個任務的睡眠時間,觀察輸出:

threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(5));
threadPoolExecutor.execute(() -> sleep(5));
複製程式碼

QQ截圖20190701185215.png

第二個任務提交5秒後,任務執行完畢,所以執行緒佇列裡的任務被執行,於是佇列執行緒個數為0,活躍執行緒數量為2(第一個和第三個任務)。再過5秒後,第三個任務執行完畢,於是活躍執行緒數量為1(第一個100秒還沒執行完畢)。

在第三個任務結束的瞬間,我們觀察執行緒快照:

QQ截圖20190701185617.png

可以看到,執行緒池中有兩個執行緒,Thread-0在執行第一個任務(休眠100秒,還沒結束),Thread-1執行完第三個任務後並沒有馬上被銷燬。過段時間後(10秒鐘後)再觀察執行緒快照:

QQ截圖20190701190444.png

可以看到,Thread-1這個執行緒被銷燬了,因為我們在建立執行緒池的時候,指定keepAliveTime 為10秒,10秒後,超出核心執行緒池執行緒外的那些執行緒將被銷燬。

假如一次性提交4個任務,看看會怎樣:

threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
複製程式碼

QQ截圖20190701190808.png

因為我們設定的拒絕策略為AbortPolicy,所以最後提交的那個任務直接被拒絕了。更多拒絕策略下面會介紹到。

關閉執行緒池

執行緒池包含以下幾個狀態:

QQ截圖20190702100110.png

當執行緒池中所有任務都處理完畢後,執行緒並不會自己關閉。我們可以通過呼叫shutdown和shutdownNow方法來關閉執行緒池。兩者的區別在於:

shutdown方法將執行緒池置為shutdown狀態,拒絕新的任務提交,但執行緒池並不會馬上關閉,而是等待所有正在折行的和執行緒佇列裡的任務都執行完畢後,執行緒池才會被關閉。所以這個方法是平滑的關閉執行緒池。

shutdownNow方法將執行緒池置為stop狀態,拒絕新的任務提交,中斷正在執行的那些任務,並且清除執行緒佇列裡的任務並返回。所以這個方法是比較“暴力”的。

舉兩個例子觀察下兩者的區別:

shutdown例子:

public static void main(String[] args) {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2,4,new ArrayBlockingQueue<>(2),new ThreadPoolExecutor.AbortPolicy());

    threadPoolExecutor.execute(new shortTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new shortTask());

    threadPoolExecutor.shutdown();
    System.out.println("已經執行了執行緒池shutdown方法");
}

static class shortTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "執行shortTask完畢");
        } catch (InterruptedException e) {
            System.err.println("shortTask執行過程中被打斷" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "執行longTask完畢");
        } catch (InterruptedException e) {
            System.err.println("longTask執行過程中被打斷" + e.getMessage());
        }
    }
}
複製程式碼

啟動程式,控制檯輸出如下:

QQ截圖20190702101041.png

可以看到,雖然在任務都被提交後馬上執行了shutdown方法,但是並不會馬上關閉執行緒池,而是等待所有被提交的任務都執行完了才關閉。

shutdownNow例子:

public static void main(String[] args) {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2,new ThreadPoolExecutor.AbortPolicy());

    threadPoolExecutor.execute(new shortTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new shortTask());

    List<Runnable> runnables = threadPoolExecutor.shutdownNow(); // 馬上關閉,並返回還未被執行的任務
    System.out.println(runnables);

    System.out.println("已經執行了執行緒池shutdownNow方法");
}

static class shortTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "執行shortTask完畢");
        } catch (InterruptedException e) {
            System.err.println("shortTask執行過程中被打斷" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "執行longTask完畢");
        } catch (InterruptedException e) {
            System.err.println("longTask執行過程中被打斷" + e.getMessage());
        }
    }
}
複製程式碼

啟動程式,控制檯輸出如下:

QQ截圖20190702101355.png

可以看到,在執行shutdownNow方法後,執行緒池馬上就被關閉了,正在執行中的兩個任務被打斷,並且返回了執行緒佇列中等待被執行的兩個任務。

通過上面兩個例子我們還可以看到shutdown和shutdownNow方法都不是阻塞的。常與shutdown搭配的方法有awaitTermination。

awaitTermination方法接收timeout和TimeUnit兩個引數,用於設定超時時間及單位。當等待超過設定時間時,會監測ExecutorService是否已經關閉,若關閉則返回true,否則返回false。該方法是阻塞的:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2,new ThreadPoolExecutor.AbortPolicy());

    threadPoolExecutor.execute(new shortTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new longTask());
    threadPoolExecutor.execute(new shortTask());

    threadPoolExecutor.shutdown();
    boolean isShutdown = threadPoolExecutor.awaitTermination(3,TimeUnit.SECONDS);
    if (isShutdown) {
        System.out.println("執行緒池在3秒內成功關閉");
    } else {
        System.out.println("等了3秒還沒關閉,不等了╰(‵□′)╯");
    }
    System.out.println("------------");
}

static class shortTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "執行shortTask完畢");
        } catch (InterruptedException e) {
            System.err.println("shortTask執行過程中被打斷" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "執行longTask完畢");
        } catch (InterruptedException e) {
            System.err.println("longTask執行過程中被打斷" + e.getMessage());
        }
    }
}
複製程式碼

啟動程式輸出如下:

QQ截圖20190702102156.png

4大拒絕策略

當執行緒池無法再接收新的任務的時候,可採取如下四種策略:

QQ截圖20190302111014.png

CallerRunsPolicy CallerRunsPolicy策略:由呼叫執行緒處理該任務:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2,3,new ThreadPoolExecutor.CallerRunsPolicy());

    threadPoolExecutor.execute(new shortTask("任務1"));
    threadPoolExecutor.execute(new longTask("任務2"));
    threadPoolExecutor.execute(new longTask("任務3"));
    threadPoolExecutor.execute(new shortTask("任務4"));
    threadPoolExecutor.execute(new shortTask("任務5"));

    threadPoolExecutor.shutdown();
}

static class shortTask implements Runnable {

    private String name;

    public shortTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "執行shortTask-name-" + name + "完畢");
        } catch (InterruptedException e) {
            System.err.println("shortTask執行過程中被打斷" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {

    private String name;

    public longTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "執行longTask-name-" + name + "完畢");
        } catch (InterruptedException e) {
            System.err.println("longTask執行過程中被打斷" + e.getMessage());
        }
    }
}
複製程式碼

上面的執行緒池最多隻能一次性提交4個任務,第5個任務提交後會被拒絕策略處理。啟動程式輸出如下:

QQ截圖20190702103818.png

可以看到,第5個提交的任務由呼叫執行緒(即main執行緒)處理該任務。

AbortPolicy AbortPolicy策略:丟棄任務,並丟擲RejectedExecutionException異常。前面的例子就是使用該策略,所以不再演示。

DiscardOldestPolicy DiscardOldestPolicy策略:丟棄最早被放入到執行緒佇列的任務,將新提交的任務放入到執行緒佇列末端:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2,new ThreadPoolExecutor.DiscardOldestPolicy());

    threadPoolExecutor.execute(new shortTask("任務1"));
    threadPoolExecutor.execute(new longTask("任務2"));
    threadPoolExecutor.execute(new longTask("任務3"));
    threadPoolExecutor.execute(new shortTask("任務4"));
    threadPoolExecutor.execute(new shortTask("任務5"));

    threadPoolExecutor.shutdown();
}

static class shortTask implements Runnable {

    private String name;

    public shortTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "執行shortTask-name-" + name + "完畢");
        } catch (InterruptedException e) {
            System.err.println("shortTask執行過程中被打斷" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {

    private String name;

    public longTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "執行longTask-name-" + name + "完畢");
        } catch (InterruptedException e) {
            System.err.println("longTask執行過程中被打斷" + e.getMessage());
        }
    }
}
複製程式碼

啟動程式輸出如下:

QQ截圖20190702105646.png

可以看到最後提交的任務被執行了,而第3個任務是第一個被放到執行緒佇列的任務,被丟棄了。

DiscardPolicy DiscardPolicy策略:直接丟棄新的任務,不拋異常:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2,new ThreadPoolExecutor.DiscardPolicy());

    threadPoolExecutor.execute(new shortTask("任務1"));
    threadPoolExecutor.execute(new longTask("任務2"));
    threadPoolExecutor.execute(new longTask("任務3"));
    threadPoolExecutor.execute(new shortTask("任務4"));
    threadPoolExecutor.execute(new shortTask("任務5"));

    threadPoolExecutor.shutdown();
}

static class shortTask implements Runnable {

    private String name;

    public shortTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "執行shortTask-name-" + name + "完畢");
        } catch (InterruptedException e) {
            System.err.println("shortTask執行過程中被打斷" + e.getMessage());
        }
    }
}

static class longTask implements Runnable {

    private String name;

    public longTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + "執行longTask-name-" + name + "完畢");
        } catch (InterruptedException e) {
            System.err.println("longTask執行過程中被打斷" + e.getMessage());
        }
    }
}
複製程式碼

啟動程式,輸出如下:

QQ截圖20190702110022.png

第5個任務直接被拒絕丟棄了,而沒有丟擲任何異常。

執行緒池工廠方法

除了使用ThreadPoolExecutor的構造方法建立執行緒池外,我們也可以使用Executors提供的工廠方法來建立不同型別的執行緒池:

QQ截圖20190702110350.png

newFixedThreadPool 檢視newFixedThreadPool方法原始碼:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
複製程式碼

可以看到,通過newFixedThreadPool建立的是一個固定大小的執行緒池,大小由nThreads引數指定,它具有如下幾個特點:

因為corePoolSize和maximumPoolSize的值都為nThreads,所以執行緒池中執行緒數量永遠等於nThreads,不可能新建除了核心執行緒數的執行緒來處理任務,即keepAliveTime實際上在這裡是無效的。

LinkedBlockingQueue是一個無界佇列(最大長度為Integer.MAX_VALUE),所以這個執行緒池理論是可以無限的接收新的任務,這就是為什麼上面沒有指定拒絕策略的原因。

newCachedThreadPool 檢視newCachedThreadPool方法原始碼:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,new SynchronousQueue<Runnable>());
}
複製程式碼

這是一個理論上無限大小的執行緒池:

核心執行緒數為0,SynchronousQueue佇列是沒有長度的佇列,所以當有新的任務提交,如果有空閒的還未超時的(最大空閒時間60秒)執行緒則執行該任務,否則新增一個執行緒來處理該任務。

因為執行緒數量沒有限制,理論上可以接收無限個新任務,所以這裡也沒有指定拒絕策略。

newSingleThreadExecutor 檢視newSingleThreadExecutor原始碼:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1,1,new LinkedBlockingQueue<Runnable>()));
}
複製程式碼

核心執行緒數和最大執行緒數都為1,每次只能有一個執行緒處理任務。

LinkedBlockingQueue佇列可以接收無限個新任務。

newScheduledThreadPool 檢視newScheduledThreadPool原始碼:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
   
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize,NANOSECONDS,new DelayedWorkQueue());
}
複製程式碼

所以newScheduledThreadPool理論是也是可以接收無限個任務,DelayedWorkQueue也是一個無界佇列。

使用newScheduledThreadPool建立的執行緒池除了可以處理普通的Runnable任務外,它還具有排程的功能:

1.延遲指定時間後執行:

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延遲5秒執行
executorService.schedule(() -> System.out.println("hello"),5,TimeUnit.SECONDS);
複製程式碼

2.按指定的速率執行:

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延遲1秒執行,然後每5秒執行一次
executorService.scheduleAtFixedRate(
        () -> System.out.println(LocalTime.now()),TimeUnit.SECONDS
);
複製程式碼

QQ截圖20190702152117.png

3.按指定的時延執行:

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(
        () -> System.out.println(LocalTime.now()),TimeUnit.SECONDS
);
複製程式碼

QQ截圖20190702152440.png

乍一看,scheduleAtFixedRate和scheduleWithFixedDelay沒啥區別,實際它們還是有區別的:

scheduleAtFixedRate按照固定速率執行任務,比如每5秒執行一個任務,即使上一個任務沒有結束,5秒後也會開始處理新的任務;

scheduleWithFixedDelay按照固定的時延處理任務,比如每延遲5秒執行一個任務,無論上一個任務處理了1秒,1分鐘還是1小時,下一個任務總是在上一個任務執行完畢後5秒鐘後開始執行。

對於這些執行緒池工廠方法的使用,阿里巴巴程式設計規程指出:

QQ截圖20190702153306.png

因為這幾個執行緒池理論是都可以接收無限個任務,所以這就有記憶體溢位的風險。實際上只要我們掌握了ThreadPoolExecutor建構函式7個引數的含義,我們就可以根據不同的業務來創建出符合需求的執行緒池。一般執行緒池的建立可以參考如下規則:

IO密集型任務,執行緒池執行緒數量可以設定為2 X CPU核心數;

計算密集型任務,執行緒池執行緒數量可以設定為CPU核心數 + 1。

一些API的用法 ThreadPoolExecutor提供了幾個判斷執行緒池狀態的方法:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            1,new ThreadPoolExecutor.AbortPolicy()
    );

    threadPoolExecutor.execute(() -> {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    threadPoolExecutor.shutdown();
    System.out.println("執行緒池為shutdown狀態:" + threadPoolExecutor.isShutdown());
    System.out.println("執行緒池正在關閉:" + threadPoolExecutor.isTerminating());
    System.out.println("執行緒池已經關閉:" + threadPoolExecutor.isTerminated());
    threadPoolExecutor.awaitTermination(6,TimeUnit.SECONDS);
    System.out.println("執行緒池已經關閉" + threadPoolExecutor.isTerminated());
}
複製程式碼

程式輸出如下:

20190703205843.png

前面我們提到,執行緒池核心執行緒即使是空閒狀態也不會被銷燬,除非使用allowCoreThreadTimeOut設定了允許核心執行緒超時:

public static void main(String[] args) throws InterruptedException {
       ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
               1,new ThreadPoolExecutor.AbortPolicy()
       );
       threadPoolExecutor.allowCoreThreadTimeOut(true);
       threadPoolExecutor.execute(() -> {
           try {
               TimeUnit.SECONDS.sleep(5);
               System.out.println("任務執行完畢");
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       });
   }
複製程式碼

程式輸出如下所示:

asdfasdfaaaaa.gif

5秒後任務執行完畢,核心執行緒處於空閒的狀態。因為通過allowCoreThreadTimeOut方法設定了允許核心執行緒超時,所以3秒後(keepAliveTime設定為3秒),核心執行緒被銷燬。核心執行緒被銷燬後,執行緒池也就沒有作用了,於是就自動關閉了。

值得注意的是,如果一個執行緒池呼叫了allowCoreThreadTimeOut(true)方法,那麼它的keepAliveTime不能為0。

ThreadPoolExecutor提供了一remove方法,檢視其原始碼:

public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}
複製程式碼

可看到,它刪除的是執行緒佇列中的任務,而非正在被執行的任務。舉個例子:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            1,new ThreadPoolExecutor.AbortPolicy()
    );
    threadPoolExecutor.execute(() -> {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println("任務執行完畢");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    Runnable r = () -> System.out.println("看看我是否會被刪除");
    threadPoolExecutor.execute(r);
    threadPoolExecutor.remove(r);

    threadPoolExecutor.shutdown();
}
複製程式碼

執行程式,輸出如下:

QQ截圖20190703211746.png

可看到任務並沒有被執行,已經被刪除,因為唯一一個核心執行緒已經在執行任務了,所以後提交的這個任務被放到了執行緒佇列裡,然後通過remove方法刪除。

預設情況下,只有當往執行緒池裡提交了任務後,執行緒池才會啟動核心執行緒處理任務。我們可以通過呼叫prestartCoreThread方法,讓核心執行緒即使沒有任務提交,也處於等待執行任務的活躍狀態:

public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2,new ThreadPoolExecutor.AbortPolicy()
    );
    System.out.println("活躍執行緒數: " + threadPoolExecutor.getActiveCount());
    threadPoolExecutor.prestartCoreThread();
    System.out.println("活躍執行緒數: " + threadPoolExecutor.getActiveCount());
    threadPoolExecutor.prestartCoreThread();
    System.out.println("活躍執行緒數: " + threadPoolExecutor.getActiveCount());
    threadPoolExecutor.prestartCoreThread();
    System.out.println("活躍執行緒數: " + threadPoolExecutor.getActiveCount());
}
複製程式碼

程式輸出如下所示:

QQ截圖20190703213145.png

該方法返回boolean型別值,如果所以核心執行緒都啟動了,返回false,反之返回true。

還有一個和它類似的prestartAllCoreThreads方法,它的作用是一次性啟動所有核心執行緒,讓其處於活躍地等待執行任務的狀態。

ThreadPoolExecutor的invokeAny方法用於隨機執行任務集合中的某個任務,並返回執行結果,該方法是同步方法:

public static void main(String[] args) throws InterruptedException,ExecutionException {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2,new ThreadPoolExecutor.AbortPolicy()
    );

    // 任務集合
    List<Callable<Integer>> tasks = IntStream.range(0,4).boxed().map(i -> (Callable<Integer>) () -> {
        TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
        return i;
    }).collect(Collectors.toList());
    // 隨機執行結果
    Integer result = threadPoolExecutor.invokeAny(tasks);
    System.out.println("-------------------");
    System.out.println(result);
    threadPoolExecutor.shutdownNow();
}
複製程式碼

啟動程式,輸出如下:

QQ截圖20190704091530.png

ThreadPoolExecutor的invokeAll則是執行任務集合中的所有任務,返回Future集合:

public static void main(String[] args) throws InterruptedException,new ThreadPoolExecutor.AbortPolicy()
    );

    List<Callable<Integer>> tasks = IntStream.range(0,4).boxed().map(i -> (Callable<Integer>) () -> {
        TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
        return i;
    }).collect(Collectors.toList());

    List<Future<Integer>> futureList = threadPoolExecutor.invokeAll(tasks);
    futureList.stream().map(f->{
        try {
            return f.get();
        } catch (InterruptedException | ExecutionException e) {
           return null;
        }
    }).forEach(System.out::println);

    threadPoolExecutor.shutdownNow();
}
複製程式碼

輸出如下:

QQ截圖20190704091836.png

總結下這些方法:

方法 描述
allowCoreThreadTimeOut(boolean value) 是否允許核心執行緒空閒後超時,是的話超時後核心執行緒將銷燬,執行緒池自動關閉
awaitTermination(long timeout,TimeUnit unit) 阻塞當前執行緒,等待執行緒池關閉,timeout用於指定等待時間。
execute(Runnable command) 向執行緒池提交任務,沒有返回值
submit(Runnable task) 向執行緒池提交任務,返回Future
isShutdown() 判斷執行緒池是否為shutdown狀態
isTerminating() 判斷執行緒池是否正在關閉
isTerminated() 判斷執行緒池是否已經關閉
remove(Runnable task) 移除執行緒佇列中的指定任務
prestartCoreThread() 提前讓一個核心執行緒處於活躍狀態,等待執行任務
prestartAllCoreThreads() 提前讓所有核心執行緒處於活躍狀態,等待執行任務
getActiveCount() 獲取執行緒池活躍執行緒數
getCorePoolSize() 獲取執行緒池核心執行緒數
threadPoolExecutor.getQueue() 獲取執行緒池執行緒佇列
getMaximumPoolSize() 獲取執行緒池最大執行緒數
shutdown() 讓執行緒池處於shutdown狀態,不再接收任務,等待所有正在執行中的任務結束後,關閉執行緒池。
shutdownNow() 讓執行緒池處於stop狀態,不再接受任務,嘗試打斷正在執行中的任務,並關閉執行緒池,返回執行緒佇列中的任務。