多執行緒學習(6)執行緒池 ThreadPool
threadpool模型:
呼叫方通過呼叫api將任務,裝進queue裡,然後會有一個機制監事queue裡有沒有task,如果有task,就分配給某個worker去執行。workers代表執行緒池的話.worker就是某條執行緒了。
執行緒池的構造方法:
Executor框架最核心的類是ThreadPoolExecutor,他是執行緒池的實現類,主要由下列7個元件構成。
package java.util.concurrent; public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
int corePoolSize, // 執行緒池可使用執行緒數的最小值
int maximumPoolSize, // 執行緒池容量的最大值
maximumPoolSize:是一個靜態變數,在變數初始化的時候,有建構函式指定.
long keepAliveTime, // 當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime為多餘的空閒執行緒等待新任務的最長時間,超過這個時間後多餘的執行緒將被終止。這裡把keepAliveTime設定為0L,意味著多餘的空閒執行緒會被立即終止。
TimeUnit unit, // 執行緒的阻塞時間單位,它的執行方法是TimeUnit.unit
內部呼叫了Thread.sleep()方法。但是它和Thread.sleep()方法的區別是,Thread.Sleep只能設定毫秒數,而TimeUnit.unit.Sleep()中的unit可以換成時間單位,比如DAYS、HOURS、MINUTES,SECONDS、MILLISECONDS和NANOSECONDS。
TimeUnit.MINUTES.sleep(4); // sleeping for 4 minutes
BlockingQueue<Runnable> workQueue, // 阻塞佇列,裡面是Runnable型別,執行緒的任務
ThreadFactory threadFactory, // 建立執行緒,併為執行緒指定queue裡面的runnable,執行緒池的構造方法,支援自定義threadFactory傳入,我們可以自己編寫newThread()方法,來實現自定義的執行緒建立邏輯。
public interface ThreadFactory {
Thread newThread(Runnable r);
}
RejectedExecutionHandler handler // 當ThreadPoolExecutor已經關閉或ThreadPoolExecutor已經飽和時(達到了最大執行緒池大小且工作佇列已滿),execute()方法將要呼叫的Handler。
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
並且這些成員變數,都是volatile修飾的
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
執行緒池成員屬性和api方法介紹
largestPoolSize: 是一個動態變數,是記錄執行緒曾經達到的最高值,也就是 largestPoolSize<= maximumPoolSize.
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
completedtaskcount:
返回已完成執行的近似任務總數。因為在計算期間任務和執行緒的狀態可能動態改變,所以返回值只是一個近似值,但是該值在整個連續呼叫過程中不會減少。
當一個執行緒在workers容器中,準備remove時,執行緒會將自己的completedtaskcount賦值給執行緒池的completedtaskcount。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
TaskCount 執行緒池執行的總任務數,包括已經執行完的任務數和任務佇列中目前還需要執行的任務數
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
getActiveCount();Thread.activeCount() 得到是存活的執行緒數 返回值是int型別
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
常見執行緒池型別:
singletenthreadPool:
SingleThreadExecutor是使用單個worker執行緒的Executor。下面是SingleThreadExecutor的原始碼實現。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor的corePoolSize和maximumPoolSize被設定為1。其他引數與FixedThreadPool相同。SingleThreadExecutor使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列(佇列的容量為Integer.MAX_VALUE)。SingleThreadExecutor使用無界佇列作為工作佇列對執行緒池帶來的影響與FixedThreadPool相同,這裡就不贅述了。
- 如果當前執行的執行緒數少於corePoolSize(即執行緒池中無執行的執行緒),則建立一個新執行緒來執行任務。
- 線上程池完成預熱之後(當前執行緒池中有一個執行的執行緒),將任務加入LinkedBlockingQueue。
- 執行緒執行完1中的任務後,會在一個無限迴圈中反覆從LinkedBlockingQueue獲取任務來執行。
fixedthreadpool:
package java.util.concurrent;
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
}
FixedThreadPool的corePoolSize和maximumPoolSize都被設定為建立FixedThreadPool時指定的引數nThreads。
當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime為多餘的空閒執行緒等待新任務的最長時間,超過這個時間後多餘的執行緒將被終止。這裡把keepAliveTime設定為0L,意味著多餘的空閒執行緒會被立即終止。
FixedThreadPool的execute()方法的執行示意圖如下所示。
對上圖的說明如下。
- 如果當前執行得執行緒數少於corePoolSize,則建立執行緒來執行任務。
- 線上程池完成預熱之後(當前執行的執行緒數等於corePoolSize),將任務加入LinkedBlockingQueue。
- 執行緒執行完1中的任務後,會在迴圈中反覆從LinkedBlockingQueue獲取任務來執行。
FixedThreadPool使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列(佇列的容量為Integer.MAX_VALUE)。使用無界佇列作為工作佇列會對執行緒池帶來如下影響。
- 當執行緒池中的執行緒數達到corePoolSize後,新任務將在無界佇列中等待,因此執行緒池中的執行緒數不會超過corePoolSize。
- 由於1,使用無界佇列時maximumPoolSize將是一個無效引數。
- 由於1和2,使用無界佇列時keepAliveTime將是一個無效引數。
- 由於使用無界佇列,執行中的FixedThreadPool(未執行方法shutdown()或shutdownNow())不會拒絕任務(不會呼叫RejectedExecutionHandler.rejectedExecution方法)。
cachethreadpool:
CacheThreadPool是一個會根據需要建立新執行緒的執行緒池。下面是建立CacheThreadPool的原始碼。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CacheThreadPool的corePoolSize被設定為0,即corePool為空;maximumPoolSize被設定為Integer.MAX_VALUE,即maximumPool是無界的。這裡把keepAliveTime設定為60L,意味著CacheThreadPool中的空閒執行緒等待新任務的最長時間為60秒,空閒執行緒超過60秒後將會被終止。
FixedThreadPool和SingleThreadExecutor使用無界佇列LinkedBlockingQueue作為執行緒池的工作佇列。CacheThreadPool使用沒有容量的SynchronousQueue作為執行緒池的工作佇列,但CacheThreadPool的maximumPool是無界的。這意味著,如果主執行緒提交任務的速度高於maximumPool中執行緒處理任務的速度時,CacheThreadPool會不斷建立新執行緒。極端情況下,CacheThreadPool會因為建立過多執行緒而耗盡CPU和記憶體資源。
對上圖的說明如下。
- 首先執行SynchronousQueue.offer(Runnable task)。如果當前maximumPool中有空閒執行緒正在執行SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),那麼主執行緒執行offer操作與空閒執行緒執行的poll操作配對成功,主執行緒把任務交給空閒執行緒執行,execute()方法執行完成;否則執行下面的步驟2。
- 當初始maximumPool為空,或者maximumPool中當前沒有空閒執行緒時,將沒有執行緒執行SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)。這種情況下,步驟1將失敗。此時CachedThreadPool會建立一個新執行緒執行任務,execute()方法執行完成。
- 在步驟2中新建立的執行緒將任務執行完後,會執行SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)。這個poll操作會讓空閒執行緒最多在SynchronousQueue中等待60秒鐘。如果60秒鐘內主執行緒提交了一個新任務(主執行緒執行步驟1),那麼這個空閒執行緒將執行主執行緒提交的新任務;否則,這個空閒執行緒將終止。由於空閒60秒的空閒執行緒會被終止,因此長時間保持空閒的CachedThreadPool不會使用任務資源。
前面提到過,SynchronousQueue是一個沒有容量的阻塞佇列。每個插入操作必須等待另一個執行緒的對應移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主執行緒提交的任務傳遞給空閒執行緒執行。CachedThreadPool中任務傳遞的示意圖如下所示。
ScheduledThreadPool
執行定時任務的執行緒池
建立執行緒池的四種方式
這四種方式,都實現了RejectedExecutionHandler介面
Abortpolicy
會丟擲異常,導致當前執行緒退出
當我們建立執行緒池時,不指定rejectedExecutionHandler時,就會預設使用AbortPolicy,當我們通過executor.execute(runnable)任務時,可能會發生異常,並將異常直接返回給了呼叫者。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
CallerRunsPolicy
當執行緒池的存活執行緒數,達到了最大值,此時又有新的請求過來,執行緒池會呼叫rejectedExecutionHandler這個介面的實現類的rejectedExecution的方法,此時該實現類正好是CallerRunsPolicy,它會讓新請求,在自己的執行緒上執行run方法,如果run方法消耗時間長,它會阻塞web容器的請求,影響web容器處理其他請求的效能。
當有外部請求訪問web服務端時,tomcat會分配一條執行緒(tomcat預設有150個執行緒,可以配置最大的為1500個執行緒來接收處理請求,且這些執行緒之間具有隔離性不會互相影響對方)來處理這個請求,當這個請求要用到執行緒池,且我們的執行緒池是基於CallerRunsPolicy來建立的,那麼CallerRunsPolicy會,使用當前請求的執行緒,來執行run方法。而當這個run方法執行時間過長時,tomcat的請求就會被佔用不放,導致無法拿出空閒的執行緒去處理其他請求,就會影響到服務端的效能。
應用場景:當我們希望執行緒池滿了之後,進行阻塞,就使用CallerRunsPolicy,阻塞的是呼叫方的,不會往queue裡放任務了。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
看上面的rejectedExecution方法體,很有意思,它執行執行緒的方式,是r.run()而不是start()方法,這很耐人尋味,原因有兩個
我們在main方法中,準備啟動一個執行緒時,如果在程式碼中我們使用thread.star()方法,jvm在執行到這行時,實際上會建立一個新的執行緒,來執行執行緒物件中的run方法,此時在執行run方法的執行緒,與執行main方法的執行緒,是兩條執行緒,沒有關聯。而上面呼叫了runnable介面例項的run方法,jvm在執行時,根本不會建立新執行緒去執行,而是就在當前的請求(執行緒)裡之心run方法,此時的run方法,根本不需要開闢或分配新執行緒來執行,而是當做一個普通方法來執行了。所以此時run方法卡住了,他就會卡住當前的請求,就會卡住web容器的請求。影響web容器處理其他請求的效能。
DiscardOldestPolicy 在我的佇列裡面,踢出出最老的一個任務
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
DiscardPolicy
不做任何處理
ThreadPool的三個階段
Workers容器
0<active<coresize
當一個task準備分配給workers容器,希望呼叫一個執行緒去執行它時,如果此時容器中存活的執行緒數小於coresize指定執行緒數時,會一次性建立一條新執行緒來執行任務,而且新執行緒也會駐留在記憶體中。而當執行緒執行完任務,並不會收回,而是變成等待狀態了。
問題:什麼時候出現activesize會超過coresize?
當coreSize向maxsize變遷的時候,不是由workers決定的,而是由queue決定的。queue裡面的task數量達到最大值的時候,coreSize就會向maxsize變遷了。我們在建立執行緒池的時候。執行緒池的構造方法會有一個BlockingQueue<Runnable> workQueue,然後我們初始化執行緒池時會指定這個queue的size,那麼呼叫者一邊往queue裡裝task,task也會一邊分配給workers去執行。只有當queue裡面的任務數,size達到了設定的最大size時,wokers才會去建立更多的執行緒,來處理任務,建立新執行緒的數量,不能超過maxsize。
core<active<maxsize
條件:任務queue滿了,會新建立執行緒去處理任務
active == maxsize
跟rejectHandlerPolicy有關係,配置了CallerRunsPolicy就會阻塞請求方,拒絕接受任務;配置了abortPolicy就會返回異常,意思是執行緒數已經創夠了,不能繼續建立了;配置了discardOldPolicy就會刪除最老任務,配置了discardPolicy就什麼都不做。
本文章參考了:https://blog.csdn.net/en_joker/article/details/84973420 《併發:Thread