JAVA併發程式設計:執行緒池 ThreadPoolExecutor
生活
前期追深度,否則會華而不實,後期追廣度,否則會坐井觀天;
前言
在前面,我們已經對Thread有了比較深入的瞭解,並且已經學會了通過new Thread()來建立一個執行緒,並通過start方法來啟動一個執行緒,這種方法非常簡單,同樣也存在弊端:
1、每次通過new Thread()建立物件效能不佳
2、執行緒缺乏統一管理,可能無限建立執行緒,相互競爭,極端情況下回出現OOM
3、無法提供定時執行、定期執行
所以在企業級專案裡,用到執行緒的地方,大多有執行緒池的介入。
ThreadPoolExecutor的成員
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
用來表示執行緒池的狀態以及正在執行的執行緒數。其中高3位描述執行緒池的狀態,低29位用來表示正在執行的執行緒數。當執行緒池建立時,預設的狀態是Running,預設的正在執行的執行緒數為0.
執行緒池的狀態有以下幾種:
RUNNING:允許已經和執行新任務
SHUTDOWN:不允許提交新任務,會執行已經提交到佇列的任務
STOP:不允許提交新任務,也不執行在佇列等待的任務,設定正在執行的任務的中斷標誌位
DITYING:所有任務執行完畢,執行緒池中的工作執行緒為0,等待執行鉤子方法terminated
TERMINATED:鉤子方法執行完畢
注意這裡的狀態指的是執行緒池的狀態,並不是指執行緒池中的執行緒狀態。
執行shutdown方法,可以使執行緒池的狀態由RUNNING轉為SHUTDOWN;
執行shutdownNow方法,可以使執行緒池的狀態由RUNNING轉為STOP
SHUTDOWN和STOP都會先轉為DITYING,再轉為TERMINATED.
以下成員對執行緒池的效能有很大影響,放在構造器裡詳說。
private final BlockingQueue<Runnable> workQueue; private volatile int maximumPoolSize; private volatile long keepAliveTime; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile int corePoolSize; private volatile int maximumPoolSize;
//執行緒池建立至今 最大的執行緒數
private int largestPoolSize;
//已完成的任務數
private long completedTaskCount;
//是否允許在空閒時銷燬核心執行緒
private volatile boolean allowCoreThreadTimeOut;
//工作執行緒的集合
private final HashSet<Worker> workers = new HashSet<Worker>();
ThreadPoolExecutor 建構函式
//所有建構函式最終都呼叫到這個構造器,來看下他的引數
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize:執行緒池中核心執行緒數的最大值
maximumPoolSize :執行緒池中最多的執行緒數
workQueue:用來快取任務的阻塞佇列
用一個新增新任務的場景來描述上面三者的關係
1、如果沒有空閒執行緒且執行緒數小於核心執行緒數,就建立一個新的執行緒執行
2、如果沒有空閒執行緒且執行緒數等於核心執行緒數,就把任務快取到阻塞佇列
3、如果沒有空閒執行緒且執行緒數小於最大執行緒數且阻塞佇列已滿,則建立一個新的執行緒執行
4、如果沒有空閒執行緒且執行緒數大於最大執行緒數且阻塞佇列已滿,則根據建構函式傳入的拒絕策略做出相應操作。
keepAliveTime:空閒超過這個時間的執行緒會被銷燬【maximumPoolSize為true時,核心執行緒也由這個時間控制銷燬】
unit:上面超時時間的單位
threadFactory:建立執行緒的工廠
handler:拒絕策略
拒絕策略有以下四種
1、AbortPolicy:直接丟擲異常
2、CallerRunsPolicy:用執行緒池提交任務的執行緒去執行 直接run
3、DiscardOldestPolicy:丟棄最早進入佇列沒有執行的執行緒
4、DiscardPolicy:丟棄這個執行緒
執行緒池的提交
執行緒池的submit execute最終都執行下面的方法來提交執行緒
public void execute(Runnable command) {
//判空
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//當工作執行緒數小於核心執行緒時
if (workerCountOf(c) < corePoolSize) {
//新增新的工作執行緒
if (addWorker(command, true))
return;
c = ctl.get();
}
//判斷執行緒池時執行中狀態, 並加入到等待佇列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//判斷是否依舊執行中狀態,如果不是,就移除這個執行緒
if (! isRunning(recheck) && remove(command))
reject(command);
//如果這時候工作執行緒為0,就建立一個執行緒去阻塞佇列獲取任務去執行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//新增核心執行緒以外的工作執行緒來執行。失敗就呼叫拒絕策略。
else if (!addWorker(command, false))
reject(command);
}
-------
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//這個判斷看了好久。。注意,,如果你一定要把兩個判斷條件一起看,真的不太好理解
//我是這麼理解的
//if裡面兩個條件:
//1、rs >= SHUTDOWN
//2、 ! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty())
//中間用&連線,也就是1不成立或者2不成立才會不進if裡面,往下走
// 也就是 rs<SHUTDOWN為true 即 執行中
//或者 rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty() 即SHUTDOWN狀態,並且當前沒有提交新任務,並且等待佇列非空
//連起來就得知 新建一個工作執行緒的 條件是 執行中 或者 (SHUTDOWN狀態,等待佇列非空,不提交新任務)
//滿足這些條件,才能往下走
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//判斷邊界條件
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas新增工作執行緒數
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//狀態有變化,從外層迴圈再來一遍
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//如果執行中 或者 已經SHUTDOWN,但是沒有提交新任務,,這個寫法就友好多了,一目瞭然
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//新增到工作執行緒集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//執行
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
執行緒池的執行
執行緒池的執行看Work的run方法,run呼叫到runWork
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//work裡的firstTask獲取佇列裡的task不為空,就往下去執行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//鉤子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//鉤子方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
看一下 getTask方法如何從佇列中得到任務呢
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//允許核心執行緒 超時銷燬 或者 (當前執行緒數超過核心執行緒數)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
(
//當前執行緒數 大於最大執行緒數 或者
//已經超時
)
並且
(
當前工作執行緒數大於1
獲取等待佇列已經空了
)
就嘗試縮減工人
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
//如果設定超時了,就要poll超時獲取
//否則用take,無限期阻塞
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
執行緒池的關閉
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 更新狀態
advanceRunState(SHUTDOWN);
//嘗試中斷worker,呼叫interruptIdleWorkers傳入false;
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 如果執行緒還沒有中斷 並且能夠獲得Worker 的鎖,說明已經執行完了,就可以中斷到
//奇怪,不知道為啥這個worker沒有去繼承可重入鎖,而是寫了一模一樣的程式碼進去。。。。
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
後記
明天來研究下Executors給我們提供的幾個預設的ThreadPoolExecutor