功能:Java執行緒池
Java執行緒池
一、介紹
執行緒池,顧名思義,這是管理一堆執行緒而出現的物件。與資料庫的連線池一致,它的出現解決了執行緒的頻繁建立和銷燬,從而浪費大量資源的問題。
所以,執行緒池中有提前建立好的執行緒,使用時直接分配獲取,使用完再由執行緒池管理是否銷燬。
優點
- 降低資源消耗,也就是不需要重複多次的建立執行緒
- 更好的管理執行緒
- 比如可以獲取當前執行的執行緒是什麼
- 還在等待執行的任務有什麼
二、使用執行緒池
在JDK5起提供了執行緒池的物件,ExecutorService
和Executors
其中,ExecutorService
和它的子類ThreadPoolExecutor
是執行緒池的關鍵
而Executors
是對應的工具類,裡面有些工廠方法可以快速建立執行緒池
檢視ThreadPoolExecutor
的構造方法
public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {} }
引數 | 說明 |
---|---|
corePoolSize | 核心執行緒數。就算目前空閒,也不會回收這幾個執行緒 |
maximumPoolSize | 最大執行緒數。當前執行緒池可以容納的最大執行緒數量 |
keepAliveTime | 執行緒保持存活時間。如果執行緒空間時間到達,將會進行銷燬(除了核心執行緒) |
unit | 與keepAliveTime 一起使用,僅是個時間單位 |
workQueue | 工作等待佇列。當執行緒池所有的執行緒都繁忙執行時,新新增的執行任務會暫時保留至此佇列 |
threadFactory | 建立執行緒的執行緒工廠 |
handler | 拒絕策略。當佇列滿了後,還有執行任務進入時的策略 |
workQueue
引數需要傳入一個BlockingQueue
,這是個阻塞佇列。BlockingQueue內部使用兩條佇列,允許兩個執行緒同時向佇列一個儲存,一個取出操作。在保證併發安全的同時,提高了佇列的存取效率,不能傳入空物件,可設定容量大小,也可以不設定容量大小,那麼它的容量就是Integer.MAX_VALUE
。常用的幾種實現類
類 | 說明 |
---|---|
ArrayBlockingQueue | 規定容量大小的阻塞佇列 |
LinkedBlockingQueue | 既可以規定容量大小,也可以不規定的阻塞佇列 |
SynchronizedQueue | 一個特殊的佇列,生產消費必須交替完成的佇列 生產一個元素後,必須要有進行消費後,才能繼續往佇列內生產元素 |
handler
拒絕策略
當執行緒池指定的佇列容量滿了時,將執行哪種拒絕任務的策略
策略類 | 說明 |
---|---|
AbortPolicy | 預設,不執行新任務,直接丟擲異常,提示執行緒池已滿 |
DiscardPolicy | 不執行新任務,也不丟擲異常 |
DiscardOldestPolicy | 它丟棄最老的未處理請求,然後重試執行,除非執行程式被關閉,在這種情況下任務被丟棄。 |
CallerRunsPolicy | 直接在外層呼叫者的執行緒中呼叫新任務 |
1)小試牛刀
package com.banmoon.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Demo1 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.execute(new MyRunnable());
executorService.execute(new MyRunnable());
executorService.execute(new MyRunnable());
executorService.execute(new MyRunnable());
executorService.execute(new MyRunnable());
// lambda表示式
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
// 關閉執行緒池,如果不關閉,執行緒池將一直存在,池子內保留著核心執行緒,等待著呼叫
executorService.shutdown();
}
}
class MyRunnable implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
2)Executors
工具類
關於此的三個相關方法原始碼,其中還有一些他們的過載,這邊就不細細講了。
這些工具類方法,主要是快速建立ThreadPoolExecutor
物件的方法,只是它們的引數各有所不同
public class Executors {
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
}
方法 | 引數說明 | 效果 |
---|---|---|
newCachedThreadPool | 核心執行緒數為0 最大執行緒數已調到Integer.MAX_VALUE |
每提交一個執行緒任務,都將新建立一個新的執行緒來執行 如果需要執行的任務很多,這有可能會導致CPU100%的問題 |
newFixedThreadPool | 核心執行緒數和最大執行緒數一致 但佇列長度為Integer.MAX_VALUE |
提交的任務將正常交給池子中的執行緒執行,執行完成也不會銷燬,等待執行新的任務 如果執行的任務很多,佇列會一直新增任務等待執行,可能會造成記憶體溢位的問題 |
newSingleThreadExecutor | 核心執行緒數和最大執行緒數都為1 但佇列長度為Integer.MAX_VALUE |
和newFixedThreadPool 類似,但池子中只有一個執行緒 |
根據需要來進行使用合適的執行緒池,測試下他們的執行方式和快慢
package com.banmoon.pool;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Demo2 {
public static void main(String[] args) {
ExecutorService executorService1 = Executors.newCachedThreadPool();
ExecutorService executorService2 = Executors.newFixedThreadPool(10);
ExecutorService executorService3 = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) {
executorService1.execute(new MyDemo2(i));
// executorService2.execute(new MyDemo2(i));
// executorService3.execute(new MyDemo2(i));
}
executorService1.shutdown();
executorService2.shutdown();
executorService3.shutdown();
}
}
class MyDemo2 implements Runnable {
private Integer i;
public MyDemo2(Integer i) {
this.i = i;
}
@Override
public void run() {
System.out.println(StrUtil.format("{}:{},時間:{}", Thread.currentThread().getName(), i, DateUtil.now()));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
newCachedThreadPool
執行結果可以看到,一共有100個執行緒被創建出來
newFixedThreadPool
執行結果,執行的永遠都是那幾個固定的執行緒,這裡我們指定了10個執行緒,所以列印也是10個為一批來進行的。
newSingleThreadExecutor
執行結果,從頭到尾就只有一個執行緒在執行
3)執行緒工廠
雖然有預設的執行緒工廠,但如果有需要進行處理的話,還是得記錄一下
package com.banmoon.pool;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Demo3 {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), new MyThreadFactory("BANMOON-TEST"));
for (int i = 0; i < 100; i++) {
executor.execute(() -> {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
class MyThreadFactory implements ThreadFactory{
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private String poolName;
MyThreadFactory(String poolName) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
this.poolName = poolName;
}
@Override
public Thread newThread(Runnable r) {
String threadName = poolName + "-" + threadNumber.getAndIncrement();
Thread t = new Thread(group, r, threadName, 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
執行結果
4)拒絕策略
拒絕策略沒什麼好講的,平常在使用時,注意下容量的大小,以及使用的策略。自己需要執行的任務數量多少,會不會照成記憶體溢位等,從這幾個方面入手,選擇最適合業務的佇列容量和拒絕策略。
策略類 | 說明 |
---|---|
AbortPolicy | 預設,不執行新任務,直接丟擲異常,提示執行緒池已滿 |
DiscardPolicy | 不執行新任務,也不丟擲異常 |
DiscardOldestPolicy | 它丟棄最老的未處理請求,然後重試執行,除非執行程式被關閉,在這種情況下任務被丟棄。 |
CallerRunsPolicy | 直接在外層呼叫者的執行緒中呼叫新任務 |
演示CallerRunsPolicy
,會在呼叫者的執行緒中,執行超出容量的任務
package com.banmoon.pool;
import java.util.concurrent.*;
public class Demo4 {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(10, 20, 30L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(20), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 1; i <= 100; i++) {
executorService.execute(new MyDemo4(i));
}
executorService.shutdown();
}
}
class MyDemo4 implements Runnable{
private Integer i;
public MyDemo4(Integer i) {
this.i = i;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ":" + i);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
執行結果,上述執行緒池指定了最大執行緒數為20,佇列容量為20。所以當執行第41個任務時,佇列滿了,將由呼叫者的執行緒來執行這個任務,此處是主執行緒
三、其他
1)執行任務的優先順序
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
// 判斷是否為空
if (command == null)
throw new NullPointerException();
// 判斷當前正在執行的執行緒數是否小於核心執行緒數
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 新增任務至執行緒執行,成功新增則結束
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果核心執行緒都有在執行,將任務放至佇列中
if (isRunning(c) && workQueue.offer(command)) {
// 如果成功推入佇列,將再次檢查執行緒狀態,有執行緒死亡則將當前任務新增至執行緒執行
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果佇列推入任務失敗了,那將直接新增至執行緒執行
else if (!addWorker(command, false))
// 如果任務新增至執行緒失敗,則將進行拒絕策略
reject(command);
}
/**
* 會從執行緒工廠獲取執行緒,並新增執行任務
* @param firstTask 執行的任務
* @param core 是否可以新增至核心執行緒
* @return true:成功新增至執行緒執行
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// ...
}
}
四、最後
執行緒池這東西乾貨還是挺多的,還有挺多沒有整理完。比如說addWorker
方法,執行緒池的執行排程等
後續有什麼新的理解繼續補上,未完待續
入我相思門,知我相思苦, 長相思兮長相憶,短相思兮無窮極, 早知如此絆人心,何如當初莫相識。歡迎來登入我的個人部落格