執行緒池執行緒複用的原理
執行緒池執行緒複用的原理
前言
執行緒池最大的作用就是複用執行緒,讓一個執行緒執行不同的任務,減少反覆地建立執行緒帶來的系統開銷。那麼執行緒池執行緒複用的原理是什麼呢?
之前面試被問到執行緒池複用的原理時,由於對原始碼不甚瞭解,只停留在原理層(八股文層)顯然時不夠的,因此這篇文章將深入原始碼,看看執行緒複用到底時如何實現的。最後,我們將自定義一個執行緒池。
一、執行緒池核心屬性】
首先我們看看執行緒池的核心屬性,這也是面試中經常被問到的問題。
public class ThreadPoolExecutor extends AbstractExecutorService { //執行緒狀態,高3為表示執行緒池狀態,低29位表示執行緒數量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //Lock鎖 private final ReentrantLock mainLock = new ReentrantLock(); //條件變數 private final Condition termination = mainLock.newCondition(); //執行緒集合 private final HashSet<Worker> workers = new HashSet<>(); //核心執行緒數 private volatile int corePoolSize; //最大執行緒數 private volatile int maximumPoolSize; //阻塞佇列 private final BlockingQueue<Runnable> workQueue; //非核心執行緒存活時間 private volatile long keepAliveTime; //執行緒工廠,所有執行緒使用執行緒工廠建立 private volatile ThreadFactory threadFactory; //拒絕策略,是一個函式式介面 private volatile RejectedExecutionHandler handler; }
二、execute原始碼
我們可以從execute() 方法,檢視執行緒複用的原理
private static final int COUNT_BITS = Integer.SIZE - 3; //32-3 private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; 2^29-1 //c & COUNT_MASK; 返回低29位,也就是執行緒樹 private static int workerCountOf(int c) { return c & COUNT_MASK; } public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); //得到執行緒數,判斷<核心執行緒數 if (workerCountOf(c) < corePoolSize) { //線上程池中建立一個執行緒並執行傳入的任務,成功返回true,否則返回false if (addWorker(command, true)) return; c = ctl.get(); } //如果執行緒池還是Running狀態且核心執行緒已滿,此時以核心執行緒數為閾值,將command加入阻塞佇列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //如果執行緒池不是Running狀態,且移除成功 if (! isRunning(recheck) && remove(command)) // 若是執行緒池處於非執行狀態,而且把當前的任務從任務佇列中移除成功,則拒絕該任務 reject(command); //若執行緒數為0 else if (workerCountOf(recheck) == 0) // 若是以前的執行緒已經被銷燬完,新建一個非核心執行緒 addWorker(null, false); } //如果佇列已滿,再一次嘗試執行此任務,此時以最大執行緒數為閾值,失敗則執行拒絕策略 else if (!addWorker(command, false)) reject(command); }
以上的步驟可以以流程圖來表示
三、addWorker原始碼
addWorker 方法的主要做用是線上程池中建立一個執行緒並執行傳入的任務,若是返回 true 表明新增成功,若是返回 false 表明新增失敗。
-
firstTask:表示傳入的任務
-
core:布林值,
-
true :表明增長執行緒時判斷當前執行緒是否少於 corePoolSize,小於則cas將ctl加1,大於等於則不增長;
-
false :表明增長執行緒時判斷當前執行緒是否少於 maximumPoolSize,小於則則cas將ctl加1,大於等於則不增長
-
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { // Check if queue empty only if necessary. if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (;;) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //建立新執行緒w w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //加鎖 try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException(); //將新建執行緒w加入集合 workers.add(w); workerAdded = true; int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { mainLock.unlock(); } if (workerAdded) { //執行執行緒t t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
現在,我們已經建立了一個執行緒來執行任務,我們發現,當執行緒數小於核心執行緒數或者當佇列已經滿時且執行緒數小於最大執行緒數時,addWorker會建立一個新執行緒來執行新任務,那麼當阻塞佇列未滿時,怎麼複用核心執行緒呢?答案就在29行,Worker類中。
四、Worker類原始碼
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
@SuppressWarnings("serial") // Unlikely to be serializable
final Thread thread;
/** Initial task to run. Possibly null. */
@SuppressWarnings("serial") // Not statically typed as Serializable
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
// TODO: switch to AbstractQueuedLongSynchronizer and move
// completedTasks into the lock word.
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); //使用執行緒工廠建立執行緒
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
......
}
Worker
實現了Runnable
介面,並且有一個final
屬性thread
,當addWorker()
方法執行new Worker(firstTask)
時,構造器通過ThreadFactory
建立新執行緒,並將當前Runnable
傳入newThread()
。因此,addWorker()
方法執行t.start
後,當該執行緒搶到cpu執行權時,將執行runWorker(this);
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//取出w的firstTask
Runnable task = w.firstTask;
//將w的firstTask置為null
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//執行緒第一次建立後,task不為null;當執行完之後的執行緒之後,該執行緒會從阻塞佇列中去取任務
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//該方法的方法體為null,留給我們自己擴充套件
beforeExecute(wt, task);
try {
//直接呼叫task的run方法
task.run();
//該方法的方法體為null,留給我們自己擴充套件
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
在這裡,我們終於找到了執行緒複用的最終實現,當呼叫execute
方法或者submit
方法時,會判斷當前執行緒數是不是小於核心執行緒數,或者當執行緒數大於核心執行緒數且小於最大執行緒數且阻塞佇列未滿,執行緒池會在addWorker
方法中新建一個Worker執行緒,並將該執行緒新增進執行緒集合中,執行緒狀態ctl加1,然後執行該執行緒。
在Worker
的run
方法中會呼叫runWorker
方法,該方法通過一個迴圈進行執行緒的複用,while (task != null || (task = getTask()) != null)
。當task≠null
(當執行緒執行建立執行緒時的任務時)或者(task = getTask()) != null
(從阻塞佇列中取任務賦值給task)時,直接執行task.run
方法而不是新建執行緒。也就是說,每一個執行緒都始終在一個大迴圈中,反覆獲取任務,而後執行任務,從而實現了執行緒的複用。