深入學習java執行緒池
我們都是通過new Thread來建立一個執行緒,由於執行緒的建立和銷燬都需要消耗一定的CPU資源,所以在高併發下這種建立執行緒的方式將嚴重影響程式碼執行效率。而執行緒池的作用就是讓一個執行緒執行結束後不馬上銷燬,繼續執行新的任務,這樣就節省了不斷建立執行緒和銷燬執行緒的開銷。
ThreadPoolExecutor
建立Java執行緒池最為核心的類為ThreadPoolExecutor:
它提供了四種建構函式來建立執行緒池,其中最為核心的建構函式如下所示:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
複製程式碼
這7個引數的含義如下:
-
corePoolSize 執行緒池核心執行緒數。即執行緒池中保留的執行緒個數,即使這些執行緒是空閒的,也不會被銷燬,除非通過ThreadPoolExecutor的allowCoreThreadTimeOut(true)方法開啟了核心執行緒的超時策略;
-
maximumPoolSize 執行緒池中允許的最大執行緒個數;
-
keepAliveTime 用於設定那些超出核心執行緒數量的執行緒的最大等待時間,超過這個時間還沒有新任務的話,超出的執行緒將被銷燬;
-
unit 超時時間單位;
-
workQueue 執行緒佇列。用於儲存通過execute方法提交的,等待被執行的任務;
-
threadFactory 執行緒建立工程,即指定怎樣建立執行緒;
-
handler 拒絕策略。即指定當執行緒提交的數量超出了maximumPoolSize後,該使用什麼策略處理超出的執行緒。
在通過這個構造方法建立執行緒池的時候,這幾個引數必須滿足以下條件,否則將丟擲IllegalArgumentException異常:
-
corePoolSize不能小於0;
-
keepAliveTime不能小於0;
-
maximumPoolSize 不能小於等於0;
-
maximumPoolSize不能小於corePoolSize;
此外,workQueue、threadFactory和handler不能為null,否則將丟擲空指標異常。
下面舉些例子來深入理解這幾個引數的含義。
使用上面的構造方法建立一個執行緒池:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1,2,10,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1),(ThreadFactory) Thread::new,new ThreadPoolExecutor.AbortPolicy());
System.out.println("執行緒池建立完畢");
int activeCount = -1;
int queueSize = -1;
while (true) {
if (activeCount != threadPoolExecutor.getActiveCount()
|| queueSize != threadPoolExecutor.getQueue().size()) {
System.out.println("活躍執行緒個數 " + threadPoolExecutor.getActiveCount());
System.out.println("核心執行緒個數 " + threadPoolExecutor.getCorePoolSize());
System.out.println("佇列執行緒個數 " + threadPoolExecutor.getQueue().size());
System.out.println("最大執行緒數 " + threadPoolExecutor.getMaximumPoolSize());
System.out.println("------------------------------------");
activeCount = threadPoolExecutor.getActiveCount();
queueSize = threadPoolExecutor.getQueue().size();
}
}
複製程式碼
上面的程式碼建立了一個核心執行緒數量為1,允許最大執行緒數量為2,最大活躍時間為10秒,執行緒佇列長度為1的執行緒池。
假如我們通過execute方法向執行緒池提交1個任務,看看結果如何:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1,new ThreadPoolExecutor.AbortPolicy());
System.out.println("執行緒池建立完畢");
threadPoolExecutor.execute(() -> sleep(100));
int activeCount = -1;
int queueSize = -1;
while (true) {
if (activeCount != threadPoolExecutor.getActiveCount()
|| queueSize != threadPoolExecutor.getQueue().size()) {
System.out.println("活躍執行緒個數 " + threadPoolExecutor.getActiveCount());
System.out.println("核心執行緒個數 " + threadPoolExecutor.getCorePoolSize());
System.out.println("佇列執行緒個數 " + threadPoolExecutor.getQueue().size());
System.out.println("最大執行緒數 " + threadPoolExecutor.getMaximumPoolSize());
System.out.println("------------------------------------");
activeCount = threadPoolExecutor.getActiveCount();
queueSize = threadPoolExecutor.getQueue().size();
}
}
複製程式碼
ThreadPoolExecutor的execute和submit方法都可以向執行緒池提交任務,區別是,submit方法能夠返回執行結果,返回值型別為Future
sleep方法程式碼:
private static void sleep(long value) {
try {
System.out.println(Thread.currentThread().getName() + "執行緒執行sleep方法");
TimeUnit.SECONDS.sleep(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
複製程式碼
啟動程式,控制檯輸出如下:
執行緒池核心執行緒數量為1,通過execute提交了一個任務後,由於核心執行緒是空閒的,所以任務被執行了。由於這個任務的邏輯是休眠100秒,所以在這100秒內,執行緒池的活躍執行緒數量為1。此外,因為提交的任務被核心執行緒執行了,所以並沒有執行緒需要被放到執行緒佇列裡等待,執行緒佇列長度為0。
假如我們通過execute方法向執行緒池提交2個任務,看看結果如何:
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
複製程式碼
執行緒池核心執行緒數量為1,通過execute提交了2個任務後,一開始核心執行緒是空閒的,Thread-0被執行。由於這個任務的邏輯是休眠100秒,所以在這100秒內,執行緒池的活躍執行緒數量為1。因為核心執行緒數量為1,所以另外一個任務在這100秒內不能被執行,於是被放到執行緒佇列裡等待,執行緒佇列長度為1。
假如我們通過execute方法向執行緒池提交3個任務,看看結果如何:
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
複製程式碼
這三個任務都是休眠100秒,所以核心執行緒池中第一個任務正在被執行,第二個任務被放入到了執行緒佇列。而當第三個任務被提交進來時,執行緒佇列滿了(我們定義的長度為1),由於該執行緒池允許的最大執行緒數量為2,所以執行緒池還可以再建立一個執行緒來執行另外一個任務,於是乎之前線上程佇列裡的執行緒被取出執行(FIFO),第三個任務被放入到了執行緒佇列。
改變第二個和第三個任務的睡眠時間,觀察輸出:
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(5));
threadPoolExecutor.execute(() -> sleep(5));
複製程式碼
第二個任務提交5秒後,任務執行完畢,所以執行緒佇列裡的任務被執行,於是佇列執行緒個數為0,活躍執行緒數量為2(第一個和第三個任務)。再過5秒後,第三個任務執行完畢,於是活躍執行緒數量為1(第一個100秒還沒執行完畢)。
在第三個任務結束的瞬間,我們觀察執行緒快照:
可以看到,執行緒池中有兩個執行緒,Thread-0在執行第一個任務(休眠100秒,還沒結束),Thread-1執行完第三個任務後並沒有馬上被銷燬。過段時間後(10秒鐘後)再觀察執行緒快照:
可以看到,Thread-1這個執行緒被銷燬了,因為我們在建立執行緒池的時候,指定keepAliveTime 為10秒,10秒後,超出核心執行緒池執行緒外的那些執行緒將被銷燬。
假如一次性提交4個任務,看看會怎樣:
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
複製程式碼
因為我們設定的拒絕策略為AbortPolicy,所以最後提交的那個任務直接被拒絕了。更多拒絕策略下面會介紹到。
關閉執行緒池
執行緒池包含以下幾個狀態:
當執行緒池中所有任務都處理完畢後,執行緒並不會自己關閉。我們可以通過呼叫shutdown和shutdownNow方法來關閉執行緒池。兩者的區別在於:
shutdown方法將執行緒池置為shutdown狀態,拒絕新的任務提交,但執行緒池並不會馬上關閉,而是等待所有正在折行的和執行緒佇列裡的任務都執行完畢後,執行緒池才會被關閉。所以這個方法是平滑的關閉執行緒池。
shutdownNow方法將執行緒池置為stop狀態,拒絕新的任務提交,中斷正在執行的那些任務,並且清除執行緒佇列裡的任務並返回。所以這個方法是比較“暴力”的。
舉兩個例子觀察下兩者的區別:
shutdown例子:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,4,new ArrayBlockingQueue<>(2),new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.shutdown();
System.out.println("已經執行了執行緒池shutdown方法");
}
static class shortTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "執行shortTask完畢");
} catch (InterruptedException e) {
System.err.println("shortTask執行過程中被打斷" + e.getMessage());
}
}
}
static class longTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "執行longTask完畢");
} catch (InterruptedException e) {
System.err.println("longTask執行過程中被打斷" + e.getMessage());
}
}
}
複製程式碼
啟動程式,控制檯輸出如下:
可以看到,雖然在任務都被提交後馬上執行了shutdown方法,但是並不會馬上關閉執行緒池,而是等待所有被提交的任務都執行完了才關閉。
shutdownNow例子:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new shortTask());
List<Runnable> runnables = threadPoolExecutor.shutdownNow(); // 馬上關閉,並返回還未被執行的任務
System.out.println(runnables);
System.out.println("已經執行了執行緒池shutdownNow方法");
}
static class shortTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "執行shortTask完畢");
} catch (InterruptedException e) {
System.err.println("shortTask執行過程中被打斷" + e.getMessage());
}
}
}
static class longTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "執行longTask完畢");
} catch (InterruptedException e) {
System.err.println("longTask執行過程中被打斷" + e.getMessage());
}
}
}
複製程式碼
啟動程式,控制檯輸出如下:
可以看到,在執行shutdownNow方法後,執行緒池馬上就被關閉了,正在執行中的兩個任務被打斷,並且返回了執行緒佇列中等待被執行的兩個任務。
通過上面兩個例子我們還可以看到shutdown和shutdownNow方法都不是阻塞的。常與shutdown搭配的方法有awaitTermination。
awaitTermination方法接收timeout和TimeUnit兩個引數,用於設定超時時間及單位。當等待超過設定時間時,會監測ExecutorService是否已經關閉,若關閉則返回true,否則返回false。該方法是阻塞的:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.shutdown();
boolean isShutdown = threadPoolExecutor.awaitTermination(3,TimeUnit.SECONDS);
if (isShutdown) {
System.out.println("執行緒池在3秒內成功關閉");
} else {
System.out.println("等了3秒還沒關閉,不等了╰(‵□′)╯");
}
System.out.println("------------");
}
static class shortTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "執行shortTask完畢");
} catch (InterruptedException e) {
System.err.println("shortTask執行過程中被打斷" + e.getMessage());
}
}
}
static class longTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "執行longTask完畢");
} catch (InterruptedException e) {
System.err.println("longTask執行過程中被打斷" + e.getMessage());
}
}
}
複製程式碼
啟動程式輸出如下:
4大拒絕策略
當執行緒池無法再接收新的任務的時候,可採取如下四種策略:
CallerRunsPolicy CallerRunsPolicy策略:由呼叫執行緒處理該任務:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,3,new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolExecutor.execute(new shortTask("任務1"));
threadPoolExecutor.execute(new longTask("任務2"));
threadPoolExecutor.execute(new longTask("任務3"));
threadPoolExecutor.execute(new shortTask("任務4"));
threadPoolExecutor.execute(new shortTask("任務5"));
threadPoolExecutor.shutdown();
}
static class shortTask implements Runnable {
private String name;
public shortTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "執行shortTask-name-" + name + "完畢");
} catch (InterruptedException e) {
System.err.println("shortTask執行過程中被打斷" + e.getMessage());
}
}
}
static class longTask implements Runnable {
private String name;
public longTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "執行longTask-name-" + name + "完畢");
} catch (InterruptedException e) {
System.err.println("longTask執行過程中被打斷" + e.getMessage());
}
}
}
複製程式碼
上面的執行緒池最多隻能一次性提交4個任務,第5個任務提交後會被拒絕策略處理。啟動程式輸出如下:
可以看到,第5個提交的任務由呼叫執行緒(即main執行緒)處理該任務。
AbortPolicy AbortPolicy策略:丟棄任務,並丟擲RejectedExecutionException異常。前面的例子就是使用該策略,所以不再演示。
DiscardOldestPolicy DiscardOldestPolicy策略:丟棄最早被放入到執行緒佇列的任務,將新提交的任務放入到執行緒佇列末端:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,new ThreadPoolExecutor.DiscardOldestPolicy());
threadPoolExecutor.execute(new shortTask("任務1"));
threadPoolExecutor.execute(new longTask("任務2"));
threadPoolExecutor.execute(new longTask("任務3"));
threadPoolExecutor.execute(new shortTask("任務4"));
threadPoolExecutor.execute(new shortTask("任務5"));
threadPoolExecutor.shutdown();
}
static class shortTask implements Runnable {
private String name;
public shortTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "執行shortTask-name-" + name + "完畢");
} catch (InterruptedException e) {
System.err.println("shortTask執行過程中被打斷" + e.getMessage());
}
}
}
static class longTask implements Runnable {
private String name;
public longTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "執行longTask-name-" + name + "完畢");
} catch (InterruptedException e) {
System.err.println("longTask執行過程中被打斷" + e.getMessage());
}
}
}
複製程式碼
啟動程式輸出如下:
可以看到最後提交的任務被執行了,而第3個任務是第一個被放到執行緒佇列的任務,被丟棄了。
DiscardPolicy DiscardPolicy策略:直接丟棄新的任務,不拋異常:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,new ThreadPoolExecutor.DiscardPolicy());
threadPoolExecutor.execute(new shortTask("任務1"));
threadPoolExecutor.execute(new longTask("任務2"));
threadPoolExecutor.execute(new longTask("任務3"));
threadPoolExecutor.execute(new shortTask("任務4"));
threadPoolExecutor.execute(new shortTask("任務5"));
threadPoolExecutor.shutdown();
}
static class shortTask implements Runnable {
private String name;
public shortTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "執行shortTask-name-" + name + "完畢");
} catch (InterruptedException e) {
System.err.println("shortTask執行過程中被打斷" + e.getMessage());
}
}
}
static class longTask implements Runnable {
private String name;
public longTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "執行longTask-name-" + name + "完畢");
} catch (InterruptedException e) {
System.err.println("longTask執行過程中被打斷" + e.getMessage());
}
}
}
複製程式碼
啟動程式,輸出如下:
第5個任務直接被拒絕丟棄了,而沒有丟擲任何異常。
執行緒池工廠方法
除了使用ThreadPoolExecutor的構造方法建立執行緒池外,我們也可以使用Executors提供的工廠方法來建立不同型別的執行緒池:
newFixedThreadPool 檢視newFixedThreadPool方法原始碼:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
複製程式碼
可以看到,通過newFixedThreadPool建立的是一個固定大小的執行緒池,大小由nThreads引數指定,它具有如下幾個特點:
因為corePoolSize和maximumPoolSize的值都為nThreads,所以執行緒池中執行緒數量永遠等於nThreads,不可能新建除了核心執行緒數的執行緒來處理任務,即keepAliveTime實際上在這裡是無效的。
LinkedBlockingQueue是一個無界佇列(最大長度為Integer.MAX_VALUE),所以這個執行緒池理論是可以無限的接收新的任務,這就是為什麼上面沒有指定拒絕策略的原因。
newCachedThreadPool 檢視newCachedThreadPool方法原始碼:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,new SynchronousQueue<Runnable>());
}
複製程式碼
這是一個理論上無限大小的執行緒池:
核心執行緒數為0,SynchronousQueue佇列是沒有長度的佇列,所以當有新的任務提交,如果有空閒的還未超時的(最大空閒時間60秒)執行緒則執行該任務,否則新增一個執行緒來處理該任務。
因為執行緒數量沒有限制,理論上可以接收無限個新任務,所以這裡也沒有指定拒絕策略。
newSingleThreadExecutor 檢視newSingleThreadExecutor原始碼:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1,1,new LinkedBlockingQueue<Runnable>()));
}
複製程式碼
核心執行緒數和最大執行緒數都為1,每次只能有一個執行緒處理任務。
LinkedBlockingQueue佇列可以接收無限個新任務。
newScheduledThreadPool 檢視newScheduledThreadPool原始碼:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize,NANOSECONDS,new DelayedWorkQueue());
}
複製程式碼
所以newScheduledThreadPool理論是也是可以接收無限個任務,DelayedWorkQueue也是一個無界佇列。
使用newScheduledThreadPool建立的執行緒池除了可以處理普通的Runnable任務外,它還具有排程的功能:
1.延遲指定時間後執行:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延遲5秒執行
executorService.schedule(() -> System.out.println("hello"),5,TimeUnit.SECONDS);
複製程式碼
2.按指定的速率執行:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延遲1秒執行,然後每5秒執行一次
executorService.scheduleAtFixedRate(
() -> System.out.println(LocalTime.now()),TimeUnit.SECONDS
);
複製程式碼
3.按指定的時延執行:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(
() -> System.out.println(LocalTime.now()),TimeUnit.SECONDS
);
複製程式碼
乍一看,scheduleAtFixedRate和scheduleWithFixedDelay沒啥區別,實際它們還是有區別的:
scheduleAtFixedRate按照固定速率執行任務,比如每5秒執行一個任務,即使上一個任務沒有結束,5秒後也會開始處理新的任務;
scheduleWithFixedDelay按照固定的時延處理任務,比如每延遲5秒執行一個任務,無論上一個任務處理了1秒,1分鐘還是1小時,下一個任務總是在上一個任務執行完畢後5秒鐘後開始執行。
對於這些執行緒池工廠方法的使用,阿里巴巴程式設計規程指出:
因為這幾個執行緒池理論是都可以接收無限個任務,所以這就有記憶體溢位的風險。實際上只要我們掌握了ThreadPoolExecutor建構函式7個引數的含義,我們就可以根據不同的業務來創建出符合需求的執行緒池。一般執行緒池的建立可以參考如下規則:
IO密集型任務,執行緒池執行緒數量可以設定為2 X CPU核心數;
計算密集型任務,執行緒池執行緒數量可以設定為CPU核心數 + 1。
一些API的用法 ThreadPoolExecutor提供了幾個判斷執行緒池狀態的方法:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1,new ThreadPoolExecutor.AbortPolicy()
);
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threadPoolExecutor.shutdown();
System.out.println("執行緒池為shutdown狀態:" + threadPoolExecutor.isShutdown());
System.out.println("執行緒池正在關閉:" + threadPoolExecutor.isTerminating());
System.out.println("執行緒池已經關閉:" + threadPoolExecutor.isTerminated());
threadPoolExecutor.awaitTermination(6,TimeUnit.SECONDS);
System.out.println("執行緒池已經關閉" + threadPoolExecutor.isTerminated());
}
複製程式碼
程式輸出如下:
前面我們提到,執行緒池核心執行緒即使是空閒狀態也不會被銷燬,除非使用allowCoreThreadTimeOut設定了允許核心執行緒超時:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1,new ThreadPoolExecutor.AbortPolicy()
);
threadPoolExecutor.allowCoreThreadTimeOut(true);
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("任務執行完畢");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
複製程式碼
程式輸出如下所示:
5秒後任務執行完畢,核心執行緒處於空閒的狀態。因為通過allowCoreThreadTimeOut方法設定了允許核心執行緒超時,所以3秒後(keepAliveTime設定為3秒),核心執行緒被銷燬。核心執行緒被銷燬後,執行緒池也就沒有作用了,於是就自動關閉了。
值得注意的是,如果一個執行緒池呼叫了allowCoreThreadTimeOut(true)方法,那麼它的keepAliveTime不能為0。
ThreadPoolExecutor提供了一remove方法,檢視其原始碼:
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
複製程式碼
可看到,它刪除的是執行緒佇列中的任務,而非正在被執行的任務。舉個例子:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1,new ThreadPoolExecutor.AbortPolicy()
);
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("任務執行完畢");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Runnable r = () -> System.out.println("看看我是否會被刪除");
threadPoolExecutor.execute(r);
threadPoolExecutor.remove(r);
threadPoolExecutor.shutdown();
}
複製程式碼
執行程式,輸出如下:
可看到任務並沒有被執行,已經被刪除,因為唯一一個核心執行緒已經在執行任務了,所以後提交的這個任務被放到了執行緒佇列裡,然後通過remove方法刪除。
預設情況下,只有當往執行緒池裡提交了任務後,執行緒池才會啟動核心執行緒處理任務。我們可以通過呼叫prestartCoreThread方法,讓核心執行緒即使沒有任務提交,也處於等待執行任務的活躍狀態:
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,new ThreadPoolExecutor.AbortPolicy()
);
System.out.println("活躍執行緒數: " + threadPoolExecutor.getActiveCount());
threadPoolExecutor.prestartCoreThread();
System.out.println("活躍執行緒數: " + threadPoolExecutor.getActiveCount());
threadPoolExecutor.prestartCoreThread();
System.out.println("活躍執行緒數: " + threadPoolExecutor.getActiveCount());
threadPoolExecutor.prestartCoreThread();
System.out.println("活躍執行緒數: " + threadPoolExecutor.getActiveCount());
}
複製程式碼
程式輸出如下所示:
該方法返回boolean型別值,如果所以核心執行緒都啟動了,返回false,反之返回true。
還有一個和它類似的prestartAllCoreThreads方法,它的作用是一次性啟動所有核心執行緒,讓其處於活躍地等待執行任務的狀態。
ThreadPoolExecutor的invokeAny方法用於隨機執行任務集合中的某個任務,並返回執行結果,該方法是同步方法:
public static void main(String[] args) throws InterruptedException,ExecutionException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,new ThreadPoolExecutor.AbortPolicy()
);
// 任務集合
List<Callable<Integer>> tasks = IntStream.range(0,4).boxed().map(i -> (Callable<Integer>) () -> {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
return i;
}).collect(Collectors.toList());
// 隨機執行結果
Integer result = threadPoolExecutor.invokeAny(tasks);
System.out.println("-------------------");
System.out.println(result);
threadPoolExecutor.shutdownNow();
}
複製程式碼
啟動程式,輸出如下:
ThreadPoolExecutor的invokeAll則是執行任務集合中的所有任務,返回Future集合:
public static void main(String[] args) throws InterruptedException,new ThreadPoolExecutor.AbortPolicy()
);
List<Callable<Integer>> tasks = IntStream.range(0,4).boxed().map(i -> (Callable<Integer>) () -> {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
return i;
}).collect(Collectors.toList());
List<Future<Integer>> futureList = threadPoolExecutor.invokeAll(tasks);
futureList.stream().map(f->{
try {
return f.get();
} catch (InterruptedException | ExecutionException e) {
return null;
}
}).forEach(System.out::println);
threadPoolExecutor.shutdownNow();
}
複製程式碼
輸出如下:
總結下這些方法:
方法 | 描述 |
---|---|
allowCoreThreadTimeOut(boolean value) | 是否允許核心執行緒空閒後超時,是的話超時後核心執行緒將銷燬,執行緒池自動關閉 |
awaitTermination(long timeout,TimeUnit unit) | 阻塞當前執行緒,等待執行緒池關閉,timeout用於指定等待時間。 |
execute(Runnable command) | 向執行緒池提交任務,沒有返回值 |
submit(Runnable task) | 向執行緒池提交任務,返回Future |
isShutdown() | 判斷執行緒池是否為shutdown狀態 |
isTerminating() | 判斷執行緒池是否正在關閉 |
isTerminated() | 判斷執行緒池是否已經關閉 |
remove(Runnable task) | 移除執行緒佇列中的指定任務 |
prestartCoreThread() | 提前讓一個核心執行緒處於活躍狀態,等待執行任務 |
prestartAllCoreThreads() | 提前讓所有核心執行緒處於活躍狀態,等待執行任務 |
getActiveCount() | 獲取執行緒池活躍執行緒數 |
getCorePoolSize() | 獲取執行緒池核心執行緒數 |
threadPoolExecutor.getQueue() | 獲取執行緒池執行緒佇列 |
getMaximumPoolSize() | 獲取執行緒池最大執行緒數 |
shutdown() | 讓執行緒池處於shutdown狀態,不再接收任務,等待所有正在執行中的任務結束後,關閉執行緒池。 |
shutdownNow() | 讓執行緒池處於stop狀態,不再接受任務,嘗試打斷正在執行中的任務,並關閉執行緒池,返回執行緒佇列中的任務。 |