淺談java執行緒池(基於jdk1.8)
多執行緒讓程式世界豐富多彩,也讓其錯綜複雜。對於執行緒的建立和銷燬成了一筆不小的開銷,為了減少這些開銷,出現了執行緒池。執行緒池對執行緒進行管理,對於需要使用多執行緒的你來說,只需要把你的任務丟給執行緒池就可以了。當你把任務丟給執行緒池的時候,它是如何處理的呢?我們去原始碼中尋找蹤跡。
ThreadPoolExecutor
執行緒池在JDK中的主要實現類就是這個ThreadPoolExecutor。我們首先看一下他的建構函式
public ThreadPoolExecutor(int corePoolSize,//核心執行緒數
int maximumPoolSize,//最大執行緒數
long keepAliveTime,//存活時間
TimeUnit unit,//存活時間的單位(秒、毫秒等)
BlockingQueue<Runnable> workQueue,//阻塞佇列
RejectedExecutionHandler handler) {//拒絕策略
...
}
建構函式中出現的這幾個引數都是執行緒池的重要指標,我們用幾句話把它們串起來,順便說明他們是如何發揮作用的: 執行緒池中有兩種重要的元素,一是執行緒,二是阻塞佇列。 1、當執行緒池剛初始化時,執行緒為0,阻塞佇列為空。 2、第一個任務來臨時,執行緒池為它新建一個執行緒來執行,第二個任務來臨時,執行緒池再為它新建一個執行緒來執行,直到新建的執行緒數達到了核心執行緒數,執行緒池暫時就不會再新建執行緒了。 3、新來的任務將會被放到阻塞佇列中,隨著新任務的不斷到來,如果阻塞佇列已滿,那麼執行緒池將會繼續為新來的任務新建執行緒,直到執行緒數達到了最大執行緒數。 4、這時,對與新來的任務,執行緒池將不會直接接受,而是執行拒絕策略
下面我們去原始碼中尋找以上敘述的蹤跡
Execute執行緒池的執行入口
execute方法是執行的入口
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//如果此時執行緒數小於核心執行緒數,則增加執行緒處理任務
if (addWorker(command, true))//A
return;//增加成功,結束
c = ctl.get();
}//執行緒數已經不小於核心執行緒數,進行入隊
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//如果執行緒池已不再執行,取出任務,執行拒絕策略
reject(command);
else if (workerCountOf(recheck) == 0)//此刻沒有執行緒,就增加一個執行緒
addWorker(null, false);//C
}//如果我們沒能成功入隊,那麼久增加一個執行緒,如果失敗;說明執行緒池已關閉或已飽和,執行拒絕策略
else if (!addWorker(command, false))//B
reject(command);
}
以上程式碼清晰可見,基本還原了上述文字敘述的流程。但我們發現當執行緒數不小於核心執行緒數時,入隊之後,還進行了一些檢測操作,就是看當前執行緒池是否還在執行,如果已經停止執行,那麼取出入隊的任務,執行拒絕策略。所以拒絕策略不只是執行緒池飽和之後執行,停止執行也會執行,當然這也是情理之中的事情。addWorker就是增加執行緒來處理任務,但我們發現這個方法的引數除了Runnable還有一個,是一個boobean值,並且在上面程式碼中的A處和B處分別呼叫了true和false,這裡有何奧妙?我們來剖開addWorker
增加工作者
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&//監測執行緒池狀態
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//如果此時有效執行緒數已經超過bound(核心執行緒數或最大執行緒數),返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
//CAS增加worker數量記錄,成功則跳出迴圈
break retry;
c = ctl.get(); // Re-read ctl準備重試
if (runStateOf(c) != rs)//如果執行緒池狀態傳送變化,從外迴圈重新開始(進行狀態監測)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
前期判斷
其實我們在上一段程式碼中就看到了c = ctl.get()這一句程式碼。這裡取出的c是一個int值,高3位表示的是此刻執行緒池的狀態,低29位表示的是此刻執行緒數。因為這個數會有多個執行緒對它進行操作,所以將它用AtomicInteger進行了包裝,並且提供從這個Int值中取出執行緒數和執行緒池狀態的方法(一些位操作),原始碼如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
從上也可以看到執行緒池有五種狀態。當呼叫shutdown()、shutdownnow()等方法時,執行緒池的狀態會發生變化,從而影響執行緒池對與新來任務的策略,這個也在addWorker中有所體現。進入addWorker這個方法首先就是進行執行緒池狀態的檢測,如果處於非執行狀態,就會返回false。但也有個特殊情況如果 (rs == SHUTDOWN &&firstTask == null && ! workQueue.isEmpty())這三個條件同時為真的話,將不會返回false。這個條件滿足時是什麼狀態呢?執行緒池處於SHUTDOWN狀態,佇列不為空,且呼叫的是addWorker(null,true/false)。此時基本就是這麼個狀態:執行緒池準備關閉了,需要新建一些執行緒來把佇列中的任務處理掉。 看完了這個檢測過程,就進入了內for迴圈,這個for迴圈中首先判斷執行緒數是否超過了某個值,如果超過,返回false,不再新建執行緒。可以發現這個值是由addWorker的第二個引數控制的,如果為true,這個值就是corePollSize,如果為false,這個值就是maxinumPoolSize。分別對應了核心執行緒的新建,和超過核心執行緒數其他執行緒的新建。然後CAS改變執行緒數量記錄,如果成功,跳出迴圈,進行執行緒的新建。如果不成功,則重試,並且如果執行緒池狀態發生了變化,還需要繼續外層迴圈,重新進行狀態檢測。兩個for迴圈之後的程式碼就是進行執行緒的新建,並且啟動這個執行緒。新建執行緒的工作是在Worker的建構函式中進行的
執行緒的新建
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
可以看到建構函式將自己傳給了newThread來新建執行緒,也就是說Worker類有一個thread成員變數,這個thread又是通過Worker來構造的。而我們啟動這個執行緒的時候呼叫的就是這個執行緒的start方法,下面看一下這個執行緒的執行邏輯,也就是worker的run方法
任務的執行
public void run() {
runWorker(this);
}
run方法就是呼叫了runWorker方法,我們再進入runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
這個方法主體是一個while迴圈,首先處理firstTask,處理完之後就去佇列裡getTask()。處理的過程很簡單,就是呼叫task的run方法(此刻的run方法呼叫才是對任務的處理)。getTask我們可以稍微看一下
任務的獲取
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?//判斷執行緒是否需要回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
如果執行緒不需要回收的話,他會去take任務,take方法如果取不到任務就會一直阻塞,取到就執行,因此這個執行緒就不會終結。但如果執行緒需要回收,那麼執行緒會去poll任務,阻塞時間一旦超過了keepAliveTime,poll就會返回null,從而執行緒也就不會繼續這個“取任務並執行任務”的迴圈,實現執行緒的回收。
各種執行緒池
JDK提供了一個工具類Executors來讓我們方便的建立各種執行緒池。 1、newFixedThreadPool 這個執行緒池中執行緒的數量是一定的,佇列無限長,不能及時處理的任務在佇列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2、newWorkStealingPool 這個執行緒池是一個支援並行任務處理的執行緒池,傳入的引數就是我們目標的並行度,為了減少爭用,內部可能出現多個佇列,實際的執行緒數也會動態的增加和減少,任務的先後執行順序並不是一定的。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
3、newSingleThreadExecutor 這個執行緒池中只會有一個執行緒和一個無界佇列。可以保證任務的執行順序,並且任何一個時刻只有一個任務在執行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
4、newCachedThreadPool 這個執行緒池只會在需要的時候建立執行緒,每個執行緒如果空閒時間超過60秒就會被回收。對執行緒的數量沒有限制,有記憶體溢位的風險。但長時間不適用的話它將是耗費資源最小的執行緒池,因為所有的執行緒都會被回收。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
5、newScheduledThreadPool 這個執行緒池可以用來週期性的執行一些任務
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}