Java Executors 和 ThreadPoolExecutor 執行緒池
Executors提供四種執行緒池,分別為:
- newCachedThreadPool
建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。執行緒池為無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的執行緒,而不用每次新建執行緒。
建立方式: Executors.newCachedThreadPool(); - newFixedThreadPool
建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。定長執行緒池的大小最好根據系統資源進行設定,如Runtime.getRuntime().availableProcessors()。
建立方式: Executors.newFixedThreadPool(); - newScheduledThreadPool
建立一個定長執行緒池,支援定時及週期性任務執行。
建立方式: Executors.newScheduledThreadPool (); - newSingleThreadExecutor
建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
建立方式: Executors.newSingleThreadExecutor ();
我們重點看一下newScheduledThreadPool示例, 其他的建立完執行緒池後,使用 threadPool.execute(new Runnable())方式執行任務。
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 表示延遲3秒執行
scheduledThreadPool.schedule(new Runnable() {
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
}
// 表示延遲1秒後每3秒執行一次
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
public void run() {
System.out.println("delay 1 seconds, and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS);
}
說在前面:
檢視Executors原始碼我們知道,Executors 類提供了使用了 ThreadPoolExecutor 的簡單的 ExecutorService 實現,也就是上面所說的四種Executors執行緒池,但是 ThreadPoolExecutor 提供的功能遠不止於此。
不過在java doc中,並不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來建立執行緒池
我們可以在建立 ThreadPoolExecutor 例項時指定活動執行緒的數量,我們也可以限制執行緒池的大小並且建立我們自己的 RejectedExecutionHandler 實現來處理不能適應工作佇列的工作。
下面我們就先了解一下ThreadPoolExecutor,然後在看個示例程式碼。
Executors 原始碼:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
}
一、ThreadPoolExecutor類
java.uitl.concurrent.ThreadPoolExecutor類是執行緒池中最核心的一個類,因此如果要透徹地瞭解Java中的執行緒池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現原始碼。
在ThreadPoolExecutor類中提供了四個構造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
從上面的程式碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,通過觀察每個構造器的原始碼具體實現,發現前面三個構造器都是呼叫的第四個構造器進行的初始化工作。
構造器中各個引數的含義:
- corePoolSize:核心池的大小,這個引數跟後面講述的執行緒池的實現原理有非常大的關係。在建立了執行緒池後,預設情況下,執行緒池中並沒有任何執行緒,而是等待有任務到來才建立執行緒去執行任務,除非呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預建立執行緒的意思,即在沒有任務到來之前就建立corePoolSize個執行緒或者一個執行緒。預設情況下,在建立了執行緒池後,執行緒池中的執行緒數為0,當有任務來之後,就會建立一個執行緒去執行任務,當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取隊列當中;
- maximumPoolSize:執行緒池最大執行緒數,這個引數也是一個非常重要的引數,它表示線上程池中最多能建立多少個執行緒;
- keepAliveTime:表示執行緒沒有任務執行時最多保持多久時間會終止。預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用,直到執行緒池中的執行緒數不大於corePoolSize,即當執行緒池中的執行緒數大於corePoolSize時,如果一個執行緒空閒的時間達到keepAliveTime,則會終止,直到執行緒池中的執行緒數不超過corePoolSize。但是如果呼叫了allowCoreThreadTimeOut(boolean)方法,線上程池中的執行緒數不大於corePoolSize時,keepAliveTime引數也會起作用,直到執行緒池中的執行緒數為0;
- unit:引數keepAliveTime的時間單位,有7種取值。TimeUnit.DAYS、TimeUnit.HOURS、TimeUnit.MINUTES、TimeUnit.SECONDS、TimeUnit.MILLISECONDS、TimeUnit.MICROSECONDS、TimeUnit.NANOSECONDS
- workQueue:一個阻塞佇列,用來儲存等待執行的任務,這個引數的選擇也很重要,會對執行緒池的執行過程產生重大影響,一般來說,這裡的阻塞佇列有以下幾種選擇:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue。
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。執行緒池的排隊策略與BlockingQueue有關。 - threadFactory:執行緒工廠,主要用來建立執行緒;
- handler:表示當拒絕處理任務時的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
檢視原始碼我們知道:
1、Executor是一個頂層介面,在它裡面只聲明瞭一個方法execute(Runnable),返回值為void,引數為Runnable型別,從字面意思可以理解,就是用來執行傳進去的任務的;
2、然後ExecutorService介面繼承了Executor介面,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;
3、抽象類AbstractExecutorService實現了ExecutorService介面,基本實現了ExecutorService中宣告的所有方法;
4、然後ThreadPoolExecutor繼承了類AbstractExecutorService。
ThreadPoolExecutor類中有幾個非常重要的方法:
1、execute()方法實際上是Executor中宣告的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向執行緒池提交一個任務,交由執行緒池去執行。
2、submit()方法是在ExecutorService中宣告的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並沒有對其進行重寫,這個方法也是用來向執行緒池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是呼叫的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。
3、shutdown()和shutdownNow()是用來關閉執行緒池的。
4、還有一大波get的方法, 可以獲取與執行緒池相關屬性的方法。
二.剖析執行緒池實現原理
1、執行緒池狀態
volatile int runState; // 前執行緒池的狀態,它是一個volatile變數用來保證執行緒之間的可見性
static final int RUNNING = 0; // 當建立執行緒池後,初始時,執行緒池處於RUNNING狀態
static final int SHUTDOWN = 1; 如果呼叫了shutdown()方法,則執行緒池處於SHUTDOWN狀態,此時執行緒池不能夠接受新的任務,它會等待所有任務執行完畢
static final int STOP = 2; // 如果呼叫了shutdownNow()方法,則執行緒池處於STOP狀態,此時執行緒池不能接受新的任務,並且會去嘗試終止正在執行的任務;
static final int TERMINATED = 3; // 當執行緒池處於SHUTDOWN或STOP狀態,並且所有工作執行緒已經銷燬,任務快取佇列已經清空或執行結束後,執行緒池被設定為TERMINATED狀態。
2.任務的執行
private final BlockingQueue<Runnable> workQueue; //任務快取佇列,用來存放等待執行的任務
private final ReentrantLock mainLock = new ReentrantLock(); //執行緒池的主要狀態鎖,對執行緒池狀態(比如執行緒池大小、runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工作集
private volatile long keepAliveTime; //執行緒存貨時間
private volatile boolean allowCoreThreadTimeOut; //是否允許為核心執行緒設定存活時間
private volatile int corePoolSize; //核心池的大小(即執行緒池中的執行緒數目大於這個引數時,提交的任務會被放進任務快取佇列)
private volatile int maximumPoolSize; //執行緒池最大能容忍的執行緒數, 當執行緒數大於corePoolSize時,建立新的先執行緒,但是建立新的執行緒數 + corePoolSize不能大於maximumPoolSize
private volatile int poolSize; //執行緒池中當前的執行緒數
private volatile RejectedExecutionHandler handler; //任務拒絕策略
private volatile ThreadFactory threadFactory; //執行緒工廠,用來建立執行緒
private int largestPoolSize; //用來記錄執行緒池中曾經出現過的最大執行緒數
private long completedTaskCount; //用來記錄已經執行完畢的任務個數
在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
從程式碼註釋,我們知道:
- 如果當前執行緒池中的執行緒數目小於corePoolSize,則每來一個任務,就會建立一個執行緒去執行這個任務;
- 如果當前執行緒池中的執行緒數目>=corePoolSize,則每來一個任務,會嘗試將其新增到任務快取隊列當中,若新增成功,則該任務會等待空閒執行緒將其取出去執行;若新增失敗(一般來說是任務快取佇列已滿),則會嘗試建立新的執行緒去執行這個任務;
- 如果當前執行緒池中的執行緒數目達到maximumPoolSize,則會採取任務拒絕策略進行處理;
- 如果執行緒池中的執行緒數量大於 corePoolSize時,如果某執行緒空閒時間超過keepAliveTime,執行緒將被終止,直至執行緒池中的執行緒數目不大於corePoolSize;如果允許為核心池中的執行緒設定存活時間,那麼核心池中的執行緒空閒時間超過keepAliveTime,執行緒也會被終止。
addWorker() 新增任務, 建立Worker, Worker 繼承 AbstractQueuedSynchronizer 實現 Runnable
addWorker()幾個關鍵步驟:
w = new Worker(firstTask);
final Thread t = w.thread; // 從worker取得執行緒
if (workerAdded) {
t.start(); // worker新增成功,執行任務
workerStarted = true;
}
3.執行緒池中的執行緒初始化
預設情況下,建立執行緒池之後,執行緒池中是沒有執行緒的,需要提交任務之後才會建立執行緒。
在實際中如果需要執行緒池建立之後立即建立執行緒,可以通過以下兩個方法辦到:
// 初始化一個核心執行緒;
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
// 初始化所有核心執行緒
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
4.任務快取佇列及排隊策略
workQueue的型別為BlockingQueue,通常可以取下面三種類型:
1)ArrayBlockingQueue:基於陣列的先進先出佇列,此佇列建立時必須指定大小;
2)LinkedBlockingQueue:基於連結串列的先進先出佇列,如果建立時沒有指定此佇列大小,則預設為Integer.MAX_VALUE;
3)synchronousQueue:這個佇列比較特殊,它不會儲存提交的任務,而是將直接新建一個執行緒來執行新來的任務。
5.任務拒絕策略
前面已經講過, 當執行緒池的任務快取佇列已滿並且執行緒池中的執行緒數目達到maximumPoolSize,如果還有任務到來就會採取任務拒絕策略,通常有以下四種策略:
ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
6.執行緒池的關閉
shutdown():不會立即終止執行緒池,而是要等所有任務快取佇列中的任務都執行完後才終止,但再也不會接受新的任務
shutdownNow():立即終止執行緒池,並嘗試打斷正在執行的任務,並且清空任務快取佇列,返回尚未執行的任務
7.執行緒池容量的動態調整
ThreadPoolExecutor提供了動態調整執行緒池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
setCorePoolSize:設定核心池大小
setMaximumPoolSize:設定執行緒池最大能建立的執行緒數目大小
當上述引數從小變大時,ThreadPoolExecutor進行執行緒賦值,還可能立即建立新的執行緒來執行任務。
三、使用示例
我們可以在建立 ThreadPoolExecutor 例項時指定活動執行緒的數量,我們也可以限制執行緒池的大小並且建立我們自己的 RejectedExecutionHandler 實現來處理不能適應工作佇列的工作。
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " is rejected");
}
}
ThreadPoolExecutor 提供了一些方法,我們可以使用這些方法來查詢 executor 的當前狀態,執行緒池大小,活動執行緒數量以及任務數量。因此我是用來一個監控執行緒在特定的時間間隔內列印 executor 資訊。
MyMonitorThread.java
public class MyMonitorThread implements Runnable
{
private ThreadPoolExecutor executor;
private int seconds;
private boolean run=true;
public MyMonitorThread(ThreadPoolExecutor executor, int delay)
{
this.executor = executor;
this.seconds=delay;
}
public void shutdown(){
this.run=false;
}
@Override
public void run()
{
while(run){
System.out.println(
String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
this.executor.getPoolSize(),
this.executor.getCorePoolSize(),
this.executor.getActiveCount(),
this.executor.getCompletedTaskCount(),
this.executor.getTaskCount(),
this.executor.isShutdown(),
this.executor.isTerminated()));
try {
Thread.sleep(seconds*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
這裡是使用 ThreadPoolExecutor 的執行緒池實現例子。
WorkerPool.java
public class WorkerPool {
public static void main(String args[]) throws InterruptedException{
//RejectedExecutionHandler implementation
RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
//Get the ThreadFactory implementation to use
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//creating the ThreadPoolExecutor
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);
//start the monitoring thread
MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
Thread monitorThread = new Thread(monitor);
monitorThread.start();
//submit work to the thread pool
for(int i=0; i<10; i++){
executorPool.execute(new WorkerThread("cmd"+i));
}
Thread.sleep(30000);
//shut down the pool
executorPool.shutdown();
//shut down the monitor thread
Thread.sleep(5000);
monitor.shutdown();
}
}
注意在初始化 ThreadPoolExecutor 時,我們保持初始池大小為 2,最大池大小為 4 而工作佇列大小為 2。因此如果已經有四個正在執行的任務而此時分配來更多工的話,工作佇列將僅僅保留他們(新任務)中的兩個,其他的將會被 RejectedExecutionHandlerImpl 處理。
四.合理配置執行緒池的大小
遵循兩原則:
1、如果是CPU密集型任務,就需要儘量壓榨CPU,參考值可以設為 NCPU+1
2、如果是IO密集型任務,參考值可以設定為2*NCPU
當然,這只是一個參考值,具體的設定還需要根據實際情況進行調整,比如可以先將執行緒池大小設定為參考值,再觀察任務執行情況和系統負載、資源利用率來進行適當調整。