執行緒池原理--執行器ThreadPoolExecutor
文章目錄
執行緒池原理–總索引
執行緒池原理–執行器ThreadPoolExecutor
ThreadPoolExecutor 是Executor的核心實現類。
屬性
- 執行緒池執行狀態
- RUNNING:接受新的任務,並且處理佇列中的任務
- SHUTDOWN: 不接受新的任務,但是仍然處理佇列中的任務
- STOP: 不接受新的任務,也不處理佇列中的任務
- TIDYING: 所有的任務已經結束, workerCount 為0,程式會呼叫鉤子方法
terminated(),這個什麼也沒做。 - TERMINATED: 所有的任務都已經完成。
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
- AtomicInteger ctl
原子整形包裝的執行緒池控制狀態(The main pool control state)。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//rs :高3位,runState , wc : 低29位,workerCount,執行緒池中當前活動的執行緒數量
private static int ctlOf(int rs, int wc) { return rs | wc; }
- CAPACITY 執行緒池的最大容量
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
- 任務佇列,用於存放執行未結束的任務。詳細看執行緒池原理–任務佇列BlockingQueue
private final BlockingQueue<Runnable> workQueue;
- ReentrantLock 用於對於工作集和related bookkeeping(不知道啥意思)的併發訪問鎖控制。
private final ReentrantLock mainLock = new ReentrantLock();
- 工作執行緒集,Worker是ThreadPoolExecutor類的內部類,並實現了Runnable介面,使用者提交的任務都是給Worker執行緒執行。
private final HashSet<Worker> workers = new HashSet<>();
- Tracks largest attained pool size. Accessed only under mainLock.
private int largestPoolSize;
private final Condition termination = mainLock.newCondition();
- 完成任務的計數器,當任務結束時更新。
private long completedTaskCount;
- 建立新執行緒的工廠
private volatile ThreadFactory threadFactory;
- 拒絕策略,詳情察看
執行緒池原理–拒絕策略之RejectedExecutionHandler類
private volatile RejectedExecutionHandler handler;
- 預設的拒絕策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
- 當 allowCoreThreadTimeOut設定為true.
如果當前執行緒池中的執行緒如果大於corePoolSize,那麼如果空閒時間超過keepAliveTime,那麼就會銷燬掉一些執行緒。
否則,就一直等到有新的任務執行。
private volatile long keepAliveTime;
- true:核心執行緒即使空閒也不會被銷燬掉。
- false:核心執行緒空閒超過keepAliveTime定義的超時時間,則會被銷燬掉。
private volatile boolean allowCoreThreadTimeOut;
- 執行緒池核心執行緒大小
private volatile int corePoolSize;
- 最大的執行緒池大小 = 核心執行緒大小 + 非核心執行緒大小
private volatile int maximumPoolSize;
-
執行時許可權
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
- 呼叫finalize會用到
private final AccessControlContext acc;
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
構造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
構造器引數介紹
下面來解釋下各個引數:
-
int corePoolSize:該執行緒池中核心執行緒數最大值
核心執行緒:執行緒池新建執行緒的時候,如果當前執行緒總數小於corePoolSize,則新建的是核心執行緒,如果超過corePoolSize,則新建的是非核心執行緒,核心執行緒預設情況下會一直存活線上程池中,即使這個核心執行緒啥也不幹(閒置狀態)。
如果指定ThreadPoolExecutor的allowCoreThreadTimeOut這個屬性為true,那麼核心執行緒如果不幹活(閒置狀態)的話,超過一定時間(時長下面引數決定),就會被銷燬掉。 -
int maximumPoolSize: 該執行緒池中執行緒總數最大值
執行緒總數 = 核心執行緒數 + 非核心執行緒數。 -
long keepAliveTime:
當 allowCoreThreadTimeOut設定為true.
如果當前執行緒池中的執行緒如果大於corePoolSize,那麼如果空閒時間超過keepAliveTime,那麼就會銷燬掉一些執行緒。
否則,就一直等到有新的任務執行。 -
TimeUnit unit:keepAliveTime的單位
TimeUnit是一個列舉型別,其包括:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000
MICROSECONDS : 1微秒 = 1毫秒 / 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小時
DAYS : 天 -
BlockingQueue ArrayBlockingQueue :一個由陣列結構組成的有界阻塞佇列。
- LinkedBlockingQueue :一個由連結串列結構組成的有界阻塞佇列。
- PriorityBlockingQueue :一個支援優先順序排序的無界阻塞佇列。
- DelayQueue: 一個使用優先順序佇列實現的無界阻塞佇列。
- SynchronousQueue: 一個不儲存元素的阻塞佇列。
- LinkedTransferQueue: 一個由連結串列結構組成的無界阻塞佇列。
- LinkedBlockingDeque: 一個由連結串列結構組成的雙向阻塞佇列。
-
ThreadFactory threadFactory:執行緒工廠,用於建立執行緒執行 我們提交的任務。
-
RejectedExecutionHandler handler:這個指定當佇列滿時繼續新增任務該u做如何處理,詳細可以看執行緒池原理–拒絕策略之RejectedExecutionHandler類。
execute()方法
execute()方法用於提交任務。
//提交的Runnable介面的實現類
//實際使用者可以提交Runnable介面的實現類或者Callable介面的實現類,AbstractExecutorService會在submit()方法中進行預處理,將Callable型別物件轉化為Runnable型別物件
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
*/
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- 如果當前執行的執行緒少於corePoolSize,則建立新執行緒來執行任務(需要獲得全域性鎖)
- 如果執行的執行緒等於或多於corePoolSize ,則將任務加入BlockingQueue
- 如果無法將任務加入BlockingQueue(佇列已滿),則建立新的執行緒來處理任務(需要獲得全域性鎖)
- 如果建立新執行緒將使當前執行的執行緒超出maxiumPoolSize,任務將被拒絕,並呼叫RejectedExecutionHandler.rejectedExecution()方法。
- addWorker(Runnable firstTask, boolean core)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
引數:
firstTask: worker執行緒的初始任務,可以為空
core: true:將corePoolSize作為上限,false:將maximumPoolSize作為上限
addWorker方法有4種傳參的方式:
1、addWorker(command, true)
2、addWorker(command, false)
3、addWorker(null, false)
4、addWorker(null, true)
在execute方法中就使用了前3種,結合這個核心方法進行以下分析
第一個:執行緒數小於corePoolSize時,放一個需要處理的task進Workers Set。如果Workers Set長度超過corePoolSize,就返回false
第二個:當佇列被放滿時,就嘗試將這個新來的task直接放入Workers Set,而此時Workers Set的長度限制是maximumPoolSize。如果執行緒池也滿了的話就返回false
第三個:放入一個空的task進workers Set,長度限制是maximumPoolSize。這樣一個task為空的worker線上程執行的時候會去任務佇列裡拿任務,這樣就相當於建立了一個新的執行緒,只是沒有馬上分配任務
第四個:這個方法就是放一個null的task進Workers Set,而且是在小於corePoolSize時,如果此時Set中的數量已經達到corePoolSize那就返回false,什麼也不幹。實際使用中是在prestartAllCoreThreads()方法,這個方法用來為執行緒池預先啟動corePoolSize個worker等待從workQueue中獲取任務執行