執行緒池基本使用及原理分析
阿新 • • 發佈:2022-03-03
# 執行緒池基本使用及原理分析
從原始碼入手,分析執行緒池的基本使用場景以及核心程式碼原理分析。分析版本:jdk1.8
@[toc]
## 執行緒池介紹
> In [computer programming](https://encyclopedia.thefreedictionary.com/Computer+programming), a **thread pool** is a software design pattern for achieving [concurrency](https://encyclopedia.thefreedictionary.com/Concurrency+(computer+science)) of execution in a computer program. Often also called a **replicated workers** or **worker-crew model**,[[1\]](https://encyclopedia.thefreedictionary.com/Thread+pool+pattern#cite_note-1) a thread pool maintains multiple [threads](https://encyclopedia.thefreedictionary.com/Thread+(computer+science)) waiting for [tasks](https://encyclopedia.thefreedictionary.com/Task+(computers)) to be allocated for [concurrent](https://encyclopedia.thefreedictionary.com/Concurrent+computing) execution by the supervising program. By maintaining a pool of threads, the model increases performance and avoids latency in execution due to frequent creation and destruction of threads for short-lived tasks.[[2\]](https://encyclopedia.thefreedictionary.com/Thread+pool+pattern#cite_note-2) The number of available threads is tuned to the computing resources available to the program, such as [parallel](https://encyclopedia.thefreedictionary.com/Parallel+computing) [processors](https://encyclopedia.thefreedictionary.com/Central+processing+unit), cores, memory, and network sockets.[[3\]](https://encyclopedia.thefreedictionary.com/Thread+pool+pattern#cite_note-3)
>
> -- 摘要自維基百科
>
> **執行緒池**(英語:thread pool):一種[執行緒](https://baike.baidu.com/item/%E7%BA%BF%E7%A8%8B)使用模式。執行緒過多會帶來排程開銷,進而影響快取區域性性和整體效能。而執行緒池維護著多個執行緒,等待著監督管理者分配可併發執行的任務。這避免了在處理短時間任務時建立與銷燬執行緒的代價。執行緒池不僅能夠保證核心的充分利用,還能防止過分排程。可用執行緒數量應該取決於可用的併發處理器、處理器核心、記憶體、網路sockets等的數量。
>
> -- 摘要自百度百科
介紹一般都能理解,**池**這個概念,在平常的開發中也非常常見,例如資料庫的連線池這些。主要還是為了方便管理和控制開銷。
## 基本使用
```java
@Test
public void contextLoads() throws IOException {
//建立一個固定大小的執行緒池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//執行執行緒
executorService.execute(() -> {
System.out.println("running thread " + Thread.currentThread());
});
//關閉執行緒池
executorService.shutdown();
System.in.read();
}
```
上面的程式碼,就是一個簡單的執行緒池使用方法。Executors是JUC提供的一個簡單使用執行緒池的工具類,為不是很瞭解執行緒池的開發者提供了一層封裝。
上面的程式碼是建立了一個FixedThreadPool。在Executors中還提供了一些其他的執行緒池的建立方法。
> ### FixedThreadPool
>
> 建立一個固定大小的執行緒池
>
> ```java
> public static ExecutorService newFixedThreadPool(int nThreads) {
> return new ThreadPoolExecutor(nThreads, nThreads,
> 0L, TimeUnit.MILLISECONDS,
> new LinkedBlockingQueue());
> }
> ```
>
> ### CachedThreadPool
>
> 這個執行緒池核心執行緒數為0,在執行執行緒時會優先判斷是否有可用執行緒,如果有則使用,如果沒有則建立。60秒後,執行緒被釋放(下面的案例中)。
>
> ```java
> public static ExecutorService newCachedThreadPool() {
> return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
> 60L, TimeUnit.SECONDS,
> new SynchronousQueue());
> }
> ```
>
> ### SingleThreadExecutor
>
> 建立一個只包含單個執行緒的執行緒池。
>
> ```java
> public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
> return new FinalizableDelegatedExecutorService
> (new ThreadPoolExecutor(1, 1,
> 0L, TimeUnit.MILLISECONDS,
> new LinkedBlockingQueue(),
> threadFactory));
> }
> ```
>
> ### ScheduledThreadPoolExecutor
>
> 支援延遲執行的執行緒池。
>
> ```java
> public ScheduledThreadPoolExecutor(int corePoolSize) {
> super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
> new DelayedWorkQueue());
> }
> ```
>
> 除了以上四種外,Executors中還提供了一些其他的執行緒池有需要可以簡單瞭解一下。
## 原理分析
### 執行緒池建立
從上面的幾種執行緒看到,最終核心部分還是在於執行緒池的建立。下面是執行緒池建立的構造方法:
```java
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
* {@code maximumPoolSize <= 0}
* {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
```
> corePoolSize:核心執行緒數
>
> maximumPoolSize:最大執行緒數
>
> keepAliveTime:非核心執行緒的存活時間
>
> unit:時間的單位
>
> workQueue:這個是任務的佇列
>
> threadFactory:執行緒工廠類
>
> handler:執行拒絕策略時的回撥,類似熔斷的補償
>
> 上面的**部分**不同型別的執行緒池實際就是通過調整這些引數實現的。
### 執行緒執行
#### 流程圖
![執行緒池執行流程圖](https://imgconvert.csdnimg.cn/aHR0cDovL2Fzc2V0cy5wcm9jZXNzb24uY29tL2NoYXJ0X2ltYWdlLzVmMzI1YzdjNjM3Njg5MzEzYWNhMWE1My5wbmc?x-oss-process=image/format,png)
#### execute
下面是執行緒池執行排程執行緒的方法:
```java
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();//這裡的ctl是用來儲存執行緒池的執行狀態和執行緒數量,高低位運算的機制,高3位用來存狀態,低29位用來儲存執行緒數量
if (workerCountOf(c) < corePoolSize) {//判斷當前執行的執行緒數是否小於核心執行緒數
if (addWorker(command, true))//新增工作執行緒,true false表示是否核心執行緒
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//判斷執行緒池是否在執行狀態並新增到阻塞佇列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//重新校驗一次狀態,如果異常執行拒絕策略
reject(command);//拒絕策略
else if (workerCountOf(recheck) == 0)//執行緒都被釋放,新建一個執行緒
addWorker(null, false);
}
else if (!addWorker(command, false))//如果不能新增到佇列,嘗試新增一個新的執行緒,如果失敗,執行拒絕策略
reject(command);
}
```
> 上面就是執行的大致流程,其實都是一些判斷。具體的執行步驟實際在原始碼中作者已經有了標註:
>
> ```java
> /*
> * Proceed in 3 steps:
> *
> * 1. If fewer than corePoolSize threads are running, try to
> * start a new thread with the given command as its first
> * task. The call to addWorker atomically checks runState and
> * workerCount, and so prevents false alarms that would add
> * threads when it shouldn't, by returning false.
> *
> * 2. If a task can be successfully queued, then we still need
> * to double-check whether we should have added a thread
> * (because existing ones died since last checking) or that
> * the pool shut down since entry into this method. So we
> * recheck state and if necessary roll back the enqueuing if
> * stopped, or start a new thread if there are none.
> *
> * 3. If we cannot queue task, then we try to add a new
> * thread. If it fails, we know we are shut down or saturated
> * and so reject the task.
> */
> ```
>
> 而執行的核心程式碼,在於addWorker,接下來分析一下addWorker的核心程式碼。
#### addWorker
```java
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {//其實這段程式碼就是檢查一下狀態和佇列情況,然後cas工作執行緒數
int c = ctl.get();
int rs = runStateOf(c);
//檢查執行緒池狀態
//如果執行緒池已經shutdown,不會接受新的任務,但是未執行完的任務還是會繼續執行
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//cas工作執行緒數,成功跳出迴圈
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)//如果狀態發生變化,continue
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;//啟動標識
boolean workerAdded = false;//新增成功標識
Worker w = null;
try {
w = new Worker(firstTask);//這裡的worker實現了Runnable
final Thread t = w.thread;//執行worker的執行緒
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//檢查執行緒池狀態、檢查firstTask狀態、檢查worker執行緒狀態
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);//將worker新增到集合
int s = workers.size();
if (s > largestPoolSize)//如果集合長度大於最大執行緒數,更新記錄最大執行緒數
largestPoolSize = s;
workerAdded = true;//標識新增成功標識
}
} finally {
mainLock.unlock();
}
if (workerAdded) {//新增成功後,啟動worker中的執行緒
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)//如果失敗了,會去執行回滾
addWorkerFailed(w);//執行回滾
}
return workerStarted;
}
```
addWorker的方法就是做一些判斷校驗,以及將firstTask新增到工作執行緒worker中。實際將邏輯委派給worker來執行,所以還需要了解一下worker的執行邏輯。worker的run方法中是呼叫runWorker(),因此看一下runWorker的方法邏輯。
#### runWorker
```java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;//拿到初始化的firstTask
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//這個執行緒就是從task中取任務,如果任務不為空就一直迴圈執行任務
while (task != null || (task = getTask()) != null) {//這裡的getTask是從阻塞佇列中取任務
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//依然是對一些狀態的判斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//這個是模板方法,預設無定義
beforeExecute(wt, task);
Throwable thrown = null;
try {
//直接呼叫task的run方法,而不是通過執行緒啟動的
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//依然模板
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;//記錄完成任務的數量
w.unlock();
}
}
completedAbruptly = false;
} finally {
//銷燬worker
processWorkerExit(w, completedAbruptly);
}
}
```
分析完runWorker方法後,可以得出,task任務的執行實際就是通過直接呼叫task的run方法來完成的,而在執行完一個任務後,會迴圈從佇列中取任務,在任務都執行完成後,去銷燬worker。這塊邏輯的關注點主要在getTask()方法以及processWorkerExit()方法,也就是從佇列中獲取任務以及任務執行完成後的銷燬。接下來逐步分析一下getTask和processWorkerExit。
#### getTask
```java
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//判斷執行緒池如果shutdown了並且為stop或者佇列中沒有任務了,則減去workerCount並返回null(返回null後執行緒會退出)。
//可以看到,如果佇列中還有任務,這裡還是會執行完的(除非執行緒池STOP了)
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//拿到工作執行緒數
int wc = workerCountOf(c);
//這裡的timed用來做超時判斷
//allowCoreThreadTimeOut預設為false,表示核心執行緒預設一直保持活動
//如果工作執行緒數已經大於核心執行緒數了,也就是非核心執行緒
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果工作執行緒數已經大於最大執行緒數了或者校驗超時控制(也就是任務超時了)
//並且任務佇列已經沒有任務了,則返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果需要超時控制,通過poll取任務,否則通過take
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)//取到任務返回
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
```
分析完getTask方法可以發現,執行緒池如果shutdown了,任務還是會被執行完,但是如果是stop了,則不會繼續執行了。核心執行緒通過take阻塞一直保持(除非設定了核心執行緒超時),非核心執行緒通過poll取任務,超時後會進入銷燬邏輯。最後,在來看下銷燬。
#### processWorkerExit
```java
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//記錄完成數量
completedTaskCount += w.completedTasks;
workers.remove(w);//集合中刪除worker
} finally {
mainLock.unlock();
}
//嘗試終止
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {//判斷是否已經終止
if (!completedAbruptly) {//為false表示任務已經執行完了的場景
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//這個min用來判斷核心執行緒的數量
if (min == 0 && ! workQueue.isEmpty())//如果核心執行緒keep數量為0並且任務佇列不為空則會至少保持一個worker的存活
min = 1;
if (workerCountOf(c) >= min)//如果當前的worker已經大於配置的核心執行緒數量了,則讓執行緒被銷燬
return; // replacement not needed
}
addWorker(null, false);
}
}
```
#### 總結
> 通過執行緒池來執行任務,原理是通過委派給一個**Worker**物件來完成。
>
> **Worker**物件本身也實現了**Runnable**介面,通過一個執行緒啟動呼叫到**Worker**物件的**run**方法,**Worker**物件在來呼叫任務的**run**方法完成物件任務的邏輯執行。**Worker**物件同時繼承了**AQS**,這是為了自定義實現獨佔鎖的功能,這是為了表示當前執行緒正在執行任務中,不應該被中斷。
>
> 核心執行緒會在**getTask**的時候通過呼叫阻塞佇列的**take**方法阻塞,直到有新的任務加入佇列重新開始執行。非核心執行緒在執行完任務後會被銷燬,超時時間的表現是在從阻塞佇列中取任務時,是呼叫**take**還是**poll**來完成。
>
> 核心執行緒會被回收的場景是在設定了**allowCoreThreadTimeOut**引數為true後,會將超時的核心執行緒銷燬。
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
* {@code maximumPoolSize <= 0}
* {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue