Java執行緒--執行緒池原理
目錄
ThreadPoolExecutor
為什麼用執行緒池
/** * 傳統多執行緒程式碼編寫方式 */ class MyTask implements Runnable{ public void run(){} } public class Test{ public static void main(String[] args){ MyTask task = new MyTask(); Thread thread = new Thread(task); thread.start(); } }
傳統的方式編寫多執行緒:
1:新建執行緒,用來執行任務,任務執行完畢後,執行緒被銷燬。執行緒的頻繁新建/銷燬都是由JVM管理的,非常的消耗系統性能。
2:當任務比較小時,花在建立和銷燬執行緒上的時間會比任務真正執行的時間還長。尤其是如果有大量的任務時,執行緒的大量建立和銷燬,有記憶體溢位的風險。
執行緒池的方式編寫多執行緒:
執行緒池的出現,不但解決了以上兩個毛病,同時還帶來其它方面的優化和提升:
A:執行緒池中的執行緒是可以重用的,不用頻繁的建立和銷燬,提高了系統的效能。
B:執行緒池中的佇列可以管理大量的任務,任務的執行,排程,排隊,丟棄等事宜都由執行緒池來管理,做到任務可控。
C:執行緒池對執行緒進行一些維護和管理,比如執行緒定時執行,執行緒生命週期管理,多少個執行緒併發,執行緒執行的監控等。
執行緒池簡介
Executor介面:其內僅有execute(Runnable task);方法
ExecutorService介面:繼承Executor,對執行緒有更多的管理,比如常用的有:submit()方法、shutdown()方法等
ScheduledExecutorService介面:繼承ExecutorService,對執行緒又進一步的支援了定時執行的職能
AbstractExecutorService類:預設實現了ExecutorService介面中的部分方法
ThreadPoolExecutor類:我們常用的類,裡面的職能有:維護任務佇列,維護執行緒組,管理執行緒排程,執行,監控,等
ScheduledThreadPoolExecutor類:裡面的職能相對於父類ThreadPoolExecutor來說,多了對執行緒定時執行的職能
執行緒池只是併發程式設計中的一小部分,下圖是史上最全面的Java的併發程式設計學習技術總彙
執行緒池工作原理
/**
* 執行緒池的建構函式
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) //後兩個引數為可選引數
執行緒池的建構函式:
談到執行緒池的工作原理,首先要從執行緒池的建構函式說起:
引數名 | 中文名稱 | 業務說明(我以"快遞員派送包裹"來易於大家理解) |
corePoolSize | 核心執行緒數 | 有編制的正式快遞員員工個數 |
maximumPoolSize | 最大執行緒數 | 比如:雙11了,包裹量急劇增多,正式員工忙不過來,只能新招臨時工 =( 最大 - 核心 ) |
keepAliveTime | 臨時工呆多久 | 臨時工就是臨時工(包裹量不多的時候會被辭退的),能呆幾天呢,就是這個keepAliveTime |
unit | 臨時工呆多久的計量單位 | 比如:臨時工呆多少小時,那麼unit就計量單位為小時;臨時工能呆多少天,unit計量單位就是天;臨時工能呆多少月,unit計量單位就是月....等等 |
workQueue | 任務佇列 | 需要派送的大量包裹儲存的地方 |
threadFactory | 執行緒工廠 | 使用ThreadFactory建立新執行緒,預設使用defaultThreadFactory建立執行緒 |
handle | 異常處理 | 包裹實在太多,多到正式員工和臨時工一起派送都忙不過來,另外存放包裹的地方都被撐爆了,實在沒地方存這些包裹了。那麼這時仍源源不斷新來的包裹我們的處理方案就是handle |
執行緒池工作原理描述:
1:有新任務了,儘可能的讓核心執行緒去執行;
2:核心執行緒都在忙了,在來的任務就放到佇列中去排隊等待被排程;
3:佇列中都塞滿了任務,還來新任務,就臨時招募非核心執行緒來執行剛到的新任務;
4:現在情況更甚,佇列滿了,核心執行緒都在忙,非核心執行緒也都在忙,還來新任務,那麼只能啟用安全策略;
5:安全策略來異常處理仍源源不斷到來的新任務,安全策略決定丟棄新來的任務呢,還是其它處理方案。
執行緒池工作原理圖解:
執行緒池工作原理的流程圖:
上述動畫圖示,翻譯成程式的流程圖如下:
執行緒池的安全策略:
看到圖示中,當新任務的到來,無法被執行緒池接納時,安全策略(也叫飽和策略)來處理這種異常,策略有哪些呢?看下錶
策略:
策略名稱 | 業務含義 |
---|---|
AbortPolicy | 預設策略,不執行此任務,而且直接丟擲RuntimeException 切記execute()需要try catch,否則程式會直接退出 |
DiscardPolicy | 直接拋棄,任務不執行,空方法 |
DiscardOldestPolicy | 從佇列裡面拋棄head的一個任務,並再次嘗試呼叫execute(task); |
CallerRunsPolicy | 當前執行緒呼叫的execute(task)方法,當前執行緒阻塞在這裡,直至task執行完畢 |
自定義策略 | (常用)自定義類實現RejectedExecutionHandler。例:可以先把任務寫入檔案或者資料庫,以防止任務丟棄 |
執行緒池的三種常用佇列:
看到圖示中,假定佇列是有界的,是不是說還有無界的佇列呢?還有更多種佇列嘛?常用的三種佇列,看下錶
佇列:
佇列名稱 | 邊界 | 業務含義 |
---|---|---|
SynchronousQueue | 有界,值1 | 在某執行緒新增元素後必須等待其他執行緒取走後才能繼續新增(可以觀看我的部落格:生產者消費者產1消1模式類似的業務場景) |
LinkedBlockingQueue | 無界|有界 均可 | 連結串列儲存結構,刪除操作代價高。
若初始化的時候,給予了初始值就是有界的,反之是無界的。
FIFO原則,隊頭head處取出任務,隊尾tail處插入任務。
高效的併發效能:一個原子AtomicInteger的佇列元素個數count,一把消費者互斥鎖,一把生成者互斥鎖,FIFO原則,組合在一起構成了高併發效能。往隊尾插入任務時的生產鎖由多個生產者互斥訪問;從隊頭取出任務時的消費鎖由多個消費者互斥訪問;插入也好,取出也罷,插入後/取出後,同步的更改佇列內元素個數;(可以觀看我的部落格:停車場類似的業務場景)。 |
ArrayListBlockingQueue | 有界 | 陣列儲存結構,遍歷是速度快的,因為陣列是連續儲存的,但是它的操作比如:移出操作是較慢的,因為要重新排序受影響的元素。為了解決移出問題,可以將此陣列想象成為一個迴圈陣列,並且配備兩個指標,兩個指標順時針方向走位。可參考 部落格
初始化的時候,給予初始值,所以是有界的。
FIFO原則,隊頭takeIndex處取出任務,隊尾putIndex處插入任務。
併發性:在生產者放入資料和消費者獲取資料,共用同一個互斥鎖物件ReentrantLock,由此也意味著兩者無法真正並行執行,就是說,ArrayListBlockingQueue是被互斥訪問的,只允許單一執行緒獲得該鎖後才能進行業務邏輯的執行,執行完後,釋放鎖。 |
執行緒池中的執行緒為什麼不回收?
我們平時編寫的Java程式碼,當new出來一個物件後,這個物件被訪問使用過後,我們是不用關心物件的回收的,是JVM虛擬機器的gc垃圾回收機制,自動幫我們回收沒用的物件。那麼,就有疑問,為什麼Executor執行緒池物件不被回收,執行緒池中的執行緒也不會gc回收呢?這要從原始碼處著眼分析:
我們平時在使用執行緒池的時候,都是直接 執行緒池.execute(Runnable);看看ThreadPoolExecutor類的原始碼execute(Runnable)方法的內部邏輯:
execute()方法:
/**
* ThreadPoolExecutor類的部分原始碼(我裁剪掉了一部分)
*/
/**
* 配合我的執行緒池工作原理圖解來看原始碼,容易理解些
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/**
* 當前幾個員工在忙(圖解1)
*/
int c = ctl.get();
/**
* 執行緒池先儘可能的讓所有正式員工都上班(圖解2)
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) //員工上班,且立馬執行新任務
return;
c = ctl.get();
}
/**
* 所有核心執行緒都在忙,任務入佇列(圖解3)
*/
if (isRunning(c) && workQueue.offer(command)) {//執行緒池沒關閉並且新任務入佇列成功
int recheck = ctl.get();//再次檢查當前幾個員工在忙(因為CPU是指令執行級別的,上面兩檢查完畢後,還不知道各個執行緒都忙成什麼樣了呢)
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
* 所有核心執行緒都在忙,任務入佇列失敗(佇列已滿),嘗試招募臨時工(圖解4)
*/
else if (!addWorker(command, false))
/**
* 預算有限,臨時工都招滿了,當前的新任務只能被安全策略異常處理(圖解5)
*/
reject(command);
}
通過以上原始碼會發現,主要的是addWorker()方法,這裡面伴隨著員工上班,並且立馬乾活去執行任務,讓我們繼續分析:
addWorker()方法:
/**
* 部分原始碼,我已裁剪掉一部分
*/
private boolean addWorker(Runnable firstTask, boolean core) {
/**
* 新建一個Worker,並立馬讓worker工作,t.start();
*/
w = new Worker(firstTask);
final Thread t = w.thread;
t.start();
}
通過以上原始碼會發現,主要的是new Worker()方法,這裡Worker是個包裝器,讓我們繼續分析:
new Worker() 構造器:
/**
* Worker類的部分原始碼
*
* Worker類實現了Runnable介面
*
* Worker類包裝了firstTask
*
* 呼叫worker.thread.start()方法,執行run()方法,run()方法內部呼叫runWorker(this);
*/
class Worker extends AbstractQueuedSynchronizer implements Runnable
{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask; //Worker類包裝了firstTask
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
}
/**
* Worker類實現了Runnable介面
* Worker類包裝了firstTask
* 呼叫worker.thread.start()方法,執行run()方法,run()方法內部呼叫runWorker(this);
*/
通過以上原始碼會發現,主要的是Worker類的runWorker(this);方法讓我們繼續分析:
runWorker(this)方法:
/**
* 部分原始碼,已被我裁剪一部分
*/
final void runWorker(Worker w) {
Runnable task = w.firstTask;
/**
* 這就是worker不會被回收,不知疲倦的始終執行任務的根本所在
*
* 執行當前任務,執行完畢後,當前任務為null; 進而 從Queue中不停的取出任務去執行
*
* 這個埋個伏筆,從Queue佇列中再也拿不到任務了,是不是說當前worker就要消亡?那就要分析getTask()
*/
while (task != null || (task = getTask()) != null) {
task.run();
}
}
/**
* 這就是worker不會被回收,不知疲倦的始終執行任務的根本所在
*
* 執行當前任務,執行完畢後,當前任務為null; 進而 從Queue中不停的取出任務去執行
*
* 這個埋個伏筆,從Queue佇列中再也拿不到任務了,是不是說當前worker就要消亡?那就要接著分析getTask()方法
*/
通過以上原始碼會發現,執行緒worker被新建之後,就執行firstTask,firstTask執行完畢之後(firstTask=null),並不被JVM的gc垃圾回收機制回收,因為它還在死迴圈,不停的從佇列中取出任務來執行。那如何保證getTask();方法就一定能取到任務呢?讓我們繼續分析getTask()方法:
getTask()方法:
/**
* 部分原始碼,已被我裁剪一部分
*/
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
//預設值false 意思是不回收核心執行緒,怎麼做到不回收呢?看程式碼workQueue.take();有阻塞功效,當前執行緒拿不到任務時,就阻塞在這裡,直到拿到新任務
private volatile boolean allowCoreThreadTimeOut;
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize;
private Runnable getTask() {
/**
* 死迴圈:直到互斥的訪問佇列時,能從佇列中取出一個任務
*/
for (;;) {
/**
* allowCoreThreadTimeOut 若為真,允許核心執行緒經過keepAliveTime時間後回收銷燬
* allowCoreThreadTimeOut 為假時(預設值false)
* wc > corePoolSize 為真,說明有臨時工在忙
* wc > corePoolSize 為假,說明沒臨時工
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;//回收執行緒 因為返回null後,呼叫該getTask()方法的上層入口方法runWorker()就退出了死迴圈,進而導致執行緒被gc回收銷燬
continue;
}
/**
* timed 為真, 當前執行緒poll()取任務,如果沒有任務就進入下一次迴圈
* timed 為假,當前執行緒一定是核心執行緒,就take()取任務,如果沒有任務就阻塞在這裡,核心執行緒不被gc回收的真諦
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();//執行緒永不回收的祕密(取不到任務,我就阻塞在這裡,所以我不會被gc回收)
if (r != null)
return r;
}
}
通過以上原始碼分析,從隊列當中獲取任務時,也是個死迴圈,不停的嘗試取得任務。這裡有:回收執行緒的程式碼 return null;這裡也有:核心執行緒永不回收的真諦 workQueue.take();[當然核心執行緒也是可以有回收的機會的,當allowCoreThreadTimeOut為真時,在當前佇列任務中所有的任務都執行完畢並且再也沒新任務到來時,核心執行緒就會在經過keepAliveTime時長後被gc回收]。那說到這,又引申出一個新的話題:所有的任務何時被執行完呢,編寫程式時,有辦法知道嘛?
總結
執行緒池的執行緒不會被gc回收,就是因為執行緒池用一堆包裝的Wroker類的集合,在裡面有條件的進行著死迴圈,從而可以不斷接受任務來進行。
執行緒池不回收我們怎麼辦?
上一小節,我們探討了執行緒池為什麼高效,就是因為它內部管理的一組執行緒不gc回收,所以不會發生頻繁的建立和銷燬執行緒,用有限的執行緒反覆的重用去執行佇列中的大量任務,提高了系統的效能。
那這一小節我們探討的是,不回收執行緒也不是絕對的一件好事,我所有的任務都執行完了,再也沒有新任務來了,你還不回收執行緒,阻塞在這裡,這也是浪費系統資源的。進而,我們不僅要問,程式上有沒有辦法知道所有的任務是何時被執行完的呢?如果都執行完了,我怎麼回收這些阻塞著的執行緒呢?詳解如下:
allowCoreThreadTimeOut
allowCoreThreadTimeOut預設是false的,當其設定為true時,是有機會釋放核心執行緒的,示例程式碼如下:
/**
* 演示回收核心執行緒:allowCoreThreadTimeOut
*/
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
public class ExecutorTest{
public static void main(String[] args){
BlockingQueue queue = new LinkedBlockingQueue();
ThreadPoolExecutor e = new ThreadPoolExecutor(3, 6, 1, TimeUnit.SECONDS,queue);
/**
* 一般不建議這樣使用,我們更多的是使用shutDown()方法
*
* 這一句如果註釋掉,所有執行緒都執行完任務後,執行緒池裡還有3個核心執行緒是阻塞在這裡的
*/
e.allowCoreThreadTimeOut(true);
for(int i=0;i<10;i++){
e.execute(new Runnable(){
public void run(){
try{
Thread.sleep((long)(Math.random()*100));
System.out.println(Thread.currentThread().getName()+"子執行緒執行完");
}catch(Exception e){}
}
});
}
}
}
shutdown()
建議使用shutdown()方法,來使得所有執行緒池內的任務都順利執行完畢後才回收執行緒。之所以能回收,是執行緒池內部呼叫了interrupt()方法,來使得所有getTask(){queue.take();}時被阻塞的執行緒被中斷。
/**
* shutdown方法的部分程式碼:
*/
public void shutdown() {
interruptIdleWorkers(); //將工作者worker進行阻斷(即:執行緒.interrupt();)
tryTerminate(); //執行緒池終結
}
/**
* 核心執行緒處理完任務佇列中的任務後,都在那裡痴痴的等待新任務而被阻塞,這也是執行緒池不回收核心執行緒的根本所在workQueue.take();
* 為此,不能讓執行緒池總是傻傻的在那等待新任務,執行緒池想要關閉了,於是有了本處的t.interrupt();呼叫
*/
private void interruptIdleWorkers(boolean onlyOne) {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock())
t.interrupt(); //核心執行緒之所以能回收:因為核心執行緒是阻塞狀態的
}
}
這裡一定要注意:
執行該方法時,執行緒池的狀態則立刻變成SHUTDOWN狀態。此時,則不能再往執行緒池中新增任何任務,否則將會丟擲RejectedExecutionException異常(也就是說:程式碼中呼叫executor.shutdown()方法之後,後續的程式碼部分不能在出現executor.execute()或者executor.submit()的呼叫)。
但是,此時執行緒池不會立刻退出,直到新增到執行緒池中的任務都已經處理完成,才會退出。
/**
* 演示回收核心執行緒:shutdown()方法
*/
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
public class ExecutorTest{
public static void main(String[] args){
BlockingQueue queue = new LinkedBlockingQueue();
ThreadPoolExecutor e = new ThreadPoolExecutor(3, 6, 1, TimeUnit.SECONDS,queue);
for(int i=0;i<10;i++){
e.execute(new Runnable(){
public void run(){
try{
Thread.sleep((long)(Math.random()*100));
System.out.println(Thread.currentThread().getName()+"子執行緒執行完");
}catch(Exception e){}
}
});
}
/**
* 建議使用shutDown()方法,來使得所有執行緒池內的任務都順利執行完畢後,回收執行緒,之所以能回收,是執行緒池內部呼叫了interrupt()方法,來使得所有getTask(){queue.take();}時被阻塞的執行緒被中斷
*
* 這一句如果註釋掉,所有執行緒都執行完任務後,執行緒池裡還有3個核心執行緒是阻塞在這裡的
*/
e.shutdown();
while (true) {
if (e.isTerminated()) {
System.out.println("所有子執行緒都徹底結束了!");
break;
}
try{
Thread.sleep(200);
}catch(Exception e1){}
}
/**
* e.isTerminated();
* 當shutdown()或shutdownNow()執行了之後才會執行,並返回true。
* 不呼叫shutdown()或shutdownNow()而直接呼叫isTerminated()永遠返回false。
*
* 通過while(true){Thread.sleep(200);}來死迴圈對cpu的佔用,資源的浪費。讓它睡一會,可以釋放cpu
*/
}
}
shutdownNow()
一般不建議使用shutdownNow()方法,原因是:它不再處理已經加入到佇列中的排隊等待的任務。
執行該方法,執行緒池的狀態立刻變成STOP狀態,並試圖停止所有正在執行的執行緒,不再處理還在池佇列中等待的任務,當然,它會返回那些未執行的任務。它試圖終止執行緒的方法是通過呼叫Thread.interrupt()方法來實現的,但是大家知道,這種方法的作用有限, 如果執行緒中沒有sleep 、wait、Condition、定時鎖等應用, interrupt()方法是無法中斷當前的執行緒的。所以,ShutdownNow()並不代表執行緒池就一定立即就能退出,它可能必須要等待所有正在執行的任務都執行完成了才能退出。
isShutDown()當呼叫shutdown()或shutdownNow()方法後,不論執行緒池中的任務是否完成,立馬返回為true。
isTerminated()當呼叫shutdown()方法後,並且等到所有執行緒池中的任務都完成後,才返回為true。
執行緒池的運用
要想合理地配置執行緒池,就必須首先分析任務特性,可以從以下幾個角度來分析。
1、任務的性質:CPU密集型任務、IO密集型任務和混合型任務。
2、任務的優先順序:高、中和低。
3、任務的執行時間:長、中和短。
4、任務的依賴性:是否依賴其他系統資源,如資料庫連線。
性質不同的任務可以用不同規模的執行緒池分開處理。CPU密集型任務應配置儘可能小的執行緒,如配置Ncpu+1個執行緒的執行緒池。由於IO密集型任務執行緒並不是一直在執行任務,則應配置儘可能多的執行緒,如2*Ncpu。混合型的任務,如果可以拆分,將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量將高於序列執行的吞吐量。如果這兩個任務執行時間相差太大,則沒必要進行分解。可以通過Runtime.getRuntime().availableProcessors()方法獲得當前裝置的CPU個數。優先順序不同的任務可以使用優先順序佇列PriorityBlockingQueue來處理。它可以讓優先順序高的任務先執行
如果一直有優先順序高的任務提交到佇列裡,那麼優先順序低的任務可能永遠不能執行。執行時間不同的任務可以交給不同規模的執行緒池來處理,或者可以使用優先順序佇列,讓執行時間短的任務先執行。依賴資料庫連線池的任務,因為執行緒提交SQL後需要等待資料庫返回結果,等待的時間越長,則CPU空閒時間就越長,那麼執行緒數應該設定得越大,這樣才能更好地利用CPU。
建議使用有界佇列。有界佇列能增加系統的穩定性和預警能力,可以根據需要設大一點兒,比如幾千。有時候我們系統裡後臺任務執行緒池的佇列和執行緒池全滿了,不斷丟擲拋棄任務的異常,通過排查發現是資料庫出現了問題,導致執行SQL變得非常緩慢,因為後臺任務執行緒池裡的任務全是需要向資料庫查詢和插入資料的,所以導致執行緒池裡的工作執行緒全部阻塞,任務積壓線上程池裡。如果當時我們設定成無界佇列,那麼執行緒池的佇列就會越來越多,有可能會撐滿記憶體,導致整個系統不可用,而不只是後臺任務出現問題。當然,我們的系統所有的任務是用單獨的伺服器部署的,我們使用不同規模的執行緒池完成不同型別的任務,但是出現這樣問題時也會影響到其他任務。
執行緒池的監控
如果在系統中大量使用執行緒池,則有必要對執行緒池進行監控,方便在出現問題時,可以根據執行緒池的使用狀況快速定位問題。可以通過執行緒池提供的引數進行監控,在監控執行緒池的時候可以使用以下屬性
- taskCount:執行緒池需要執行的任務數量。
- completedTaskCount:執行緒池在執行過程中已完成的任務數量,小於或等於taskCount。
- largestPoolSize:執行緒池裡曾經建立過的最大執行緒數量。通過這個資料可以知道執行緒池是否曾經滿過。如該數值等於執行緒池的最大大小,則表示執行緒池曾經滿過。
- getPoolSize:執行緒池的執行緒數量。如果執行緒池不銷燬的話,執行緒池裡的執行緒不會自動銷燬,所以這個大小隻增不減。
- getActiveCount:獲取活動的執行緒數。
通過擴充套件執行緒池進行監控。可以通過繼承執行緒池來自定義執行緒池,重寫執行緒池的beforeExecute、afterExecute和terminated方法,也可以在任務執行前、執行後和執行緒池關閉前執行一些程式碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。