Java高併發程式設計(十一):Java中執行緒池
在開發過程中,合理地使用執行緒池能夠帶來3個好處。
- 降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
- 提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。
- 提高執行緒的可管理性。執行緒是稀缺資源,如果無限制地建立,不僅會消耗系統資源,
還會降低系統的穩定性,使用執行緒池可以進行統一分配、調優和監控。
1. 執行緒池的使用原理
從圖中可以看出,當提交一個新任務到執行緒池時,執行緒池的處理流程如下。
- 執行緒池判斷核心執行緒池裡的執行緒是否都在執行任務。如果不是,則建立一個新的工作
執行緒來執行任務。如果核心執行緒池裡的執行緒都在執行任務,則進入下個流程。- 執行緒池判斷工作佇列是否已經滿。如果工作佇列沒有滿,則將新提交的任務儲存在這
個工作佇列裡。如果工作佇列滿了,則進入下個流程。- 執行緒池判斷執行緒池的執行緒是否都處於工作狀態。如果沒有,則建立一個新的工作執行緒
來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。
ThreadPool的執行示意圖:
ThreadPoolExecutor執行execute方法分下面4種情況。
- 如果當前執行的執行緒少於corePoolSize,則建立新執行緒來執行任務(注意,執行這一步驟需要獲取全域性鎖)。
- 如果執行的執行緒等於或多於corePoolSize,則將任務加入BlockingQueue。
- 如果無法將任務加入BlockingQueue(佇列已滿),則建立新的執行緒來處理任務(注意,執行這一步驟需要獲取全域性鎖)。
- 如果建立新執行緒將使當前執行的執行緒超出maximumPoolSize,任務將被拒絕,並呼叫
RejectedExecutionHandler.rejectedExecution()方法。
下面我們從原始碼的角度分析一下ThreadPoolExecutor的execute()方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果執行緒數小於基本執行緒數,則建立執行緒並執行當前任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如執行緒數大於等於基本執行緒數或執行緒建立失敗,則將當前任務放到工作佇列中。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//丟擲RejectedExecutionException異常
reject(command);
//如果執行緒池不處於執行中或任務無法放入佇列中,並且當前執行緒的數量小於最大允許的執行緒數量,則建立一個工作執行緒執行任務。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
//丟擲RejectedExecutionException異常
reject(command);
}
工作執行緒:執行緒池建立執行緒時,會將執行緒封裝成工作執行緒Worker,Worker在執行完任務
後,還會迴圈獲取工作佇列裡的任務來執行。我們可以從Worker類的run()方法裡看到這點。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
ThreadPoolExecutor中執行緒執行任務的示意圖 如下:
2. 執行緒池的使用
2.1 執行緒池的建立
我們可以通過ThreadPoolExecutor來建立一個執行緒池。
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,milliseconds,runnableTaskQueue, handler);
建立一個執行緒池時需要輸入幾個引數,如下。
- corePoolSize(執行緒池的基本大小):當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。
- runnableTaskQueue(任務佇列):用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列。
- ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列
- LinkedBlockingQueue:一個基於連結串列結構的有界阻塞佇列
- SynchronousQueue:一個不儲存元素的阻塞佇列。
- PriorityBlockingQueue:一個具有優先順序的無限阻塞佇列。
- maximumPoolSize(執行緒池最大數量):執行緒池允許建立的最大執行緒數。
- ThreadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字。使用開源框架guava提供的ThreadFactoryBuilder
- RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。
- AbortPolicy: 直接丟擲異常
- CallerRunsPolicy:只用呼叫者所線上程來執行任務
- DiscardOldestPolicy:丟棄最近的一個任務,並執行當前執行緒。
- DiscardPolicy:不處理丟掉
- keepAliveTime(執行緒活動保持時間):執行緒池的工作執行緒空閒後,保持存活的時間。所以,如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高執行緒的利用率。
- TimeUnit(執行緒活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒
2.2 向執行緒池提交任務
可以使用兩個方法向執行緒池提交任務,分別為execute()和submit()方法。
- execute()方法用於提交不需要返回值的任務,所以無法判斷任務是否被執行緒池執行成功。
- submit()方法用於提交需要返回值的任務。執行緒池會返回一個future型別的物件,通過這個future物件可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前執行緒直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前執行緒一段時間後立即返回,這時候有可能任務沒有執行完。
2.3 關閉執行緒池
關閉執行緒池有兩種方式,一種是呼叫shutdown()方法還有一種是呼叫shutdownNow()方法來關閉執行緒池。它們的原理是遍歷執行緒池中的工作執行緒,然後逐個呼叫執行緒的interrupt方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法終止。
但是他們之間也存在一些區別:
- shutdownNow首先將執行緒池的狀態設定成STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表
- shutdown只是將執行緒池的狀態設定成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的執行緒。
當呼叫shutdown()和shutdownNow()方法之後,執行緒池執行isShutdown方法就會返回true。當所有的任務都已關閉後,才表示執行緒池關閉成功,這時呼叫isTerminaed方法會返回true。
2.4 合理地配置執行緒池
要合理配置執行緒池,需要先分析任務的特性:
- 任務的性質:CPU密集型任務、IO密集型任務 和 混合型任務。
- 優先順序:高、中、低。
- 執行時間:長、中、短。
- 任務的依賴性:是否依賴其他的系統資源,如資料庫連線等。
配置方案:
-
根據任務性質,可做如下配置:
-
CPU密集型任務
儘量使用小規模執行緒池,一般執行緒數量:CPU核心數 + 1。
因為CPU密集型任務的CPU利用率很高,過多執行緒導致上下文切換過多,造成額外開銷。
-
IO密集型任務
使用稍大的執行緒池,儘量配置多的執行緒,一般執行緒配置數量:CPU核心數 * 2。
這種任務CPU利用率不是很高,因此可以讓CPU在等待IO的時候去處理別的任務。
-
混合型任務
任務分解成:CPU密集 和 IO密集 型任務,然後分別用不同規模執行緒池處理。
注意:只要分解後的兩個任務執行時間相差不大,會比原來序列執行吞吐量高。
但是,如果相差時間不大,先執行完的任務 要等待 後執行完的任務,最終執行時間依然取決於 後執行完的任務,還要加上任務拆分、合併的開銷,得不償失。
-
-
優先順序,可以使用優先順序佇列PriorityBlockingQueue來處理,它會讓優先順序高的任務先執行。
-
執行時間不同的任務,可以交給不同規模的執行緒池來處理,或使用優先順序佇列,讓短任務先執行。
-
資料庫依賴性任務,執行緒提交SQL等待資料庫返回結果,等待時間越長,則CPU空閒時間越長。因此,可以將執行緒數量設定較大,能更好利用CPU。
使用執行緒池建議:建議使用有界佇列,可以增加系統穩定性 和 預警能力。
3.5 執行緒池的監控
如果系統大量使用執行緒池,則有必要對執行緒池進行監控。
可以通過執行緒池提供的引數進行監控:
- taskCount:需要執行的任務數量
- completedTaskCount:已完成的任務數量
- largestPoolSize:執行緒池中曾經建立過的最大執行緒數量
- getPoolSize:執行緒池的執行緒數量,只增不減
- getActiveCount:獲取活動的執行緒數量
也可繼承執行緒池來自定義執行緒池進行監控,重寫beforeExecute()
、afterExecute()
和terminated()
方法進行監控。