全面講解程序池原理
執行緒池(Executor)
什麼是執行緒池?
Java5引入了新的稱為Executor
框架的併發API,以簡化程式設計師的工作。它簡化了多執行緒應用程式的設計和開發。它主要由Executor
、ExecutorService
介面和ThreadPoolExecutor
類組成,ThreadPoolExecutor
類同時實現Executor
和ExecutorService
介面。ThreadPoolExecutor
類提供執行緒池的實現。我們將在教程的後面部分了解更多。
執行緒池繼承關係圖
為什麼我們需要執行緒池?
當我們建立一個簡單的多執行緒應用程式時,我們建立Runnable
物件,並使用Runnable
Executor
框架為您做這件事。它負責建立、執行和管理執行緒,不僅如此,它還提高了應用程式的效能。
當您為每個任務建立一個新執行緒,然後如果系統高度過載,您將出現記憶體不足錯誤,系統將失敗,甚至丟擲oom異常。如果使用ThreadPoolExecutor
,則不會為新任務建立執行緒。將任務分配給有限數量的執行緒只去執行Runnable
,一旦執行緒完成一個任務,他將會去阻塞佇列中獲取Runnable
去執行。
如何建立執行緒池?
publicinterfaceExecutor{
voidexecute(Runnablecommand);
}
還有另一個名為ExecutorService的介面,它擴充套件了Executor介面。它可以被稱為Executor,它提供了可以控制終止的方法和可以生成未來跟蹤一個或多個非同步任務進度的方法。它有提交、關機、立即關機等方法。
ThreadPoolExecutor是ThreadPool的實際實現。它擴充套件了實現ExecutorService介面的AbstractThreadPoolExecutor。可以從Executor類的工廠方法建立ThreadPoolExecutor。建議使用一種方法獲取ThreadPoolExecutor的例項。
-
使用
Executors
工廠方法去建立執行緒池:
提供預設靜態方法
Executors類中有4個工廠方法可用於獲取ThreadPoolExecutor的例項。我們正在使用Executors的newFixedThreadPool獲取ThreadPoolExecutor的一個例項。
ThreadPoolExecutorthreadPoolExecutor=(ThreadPoolExecutor)Executors.newFixedThreadPool(5);
方法 | 說明 |
---|---|
newFixedThreadPool(int nThreads) | 此方法返回執行緒池執行器,其最大大小(例如n個執行緒)是固定的 |
newCachedThreadPool() | 此方法返回一個無限執行緒池。 |
newSingleThreadedExecutor() | 此方法返回一個執行緒執行器,該執行器保證使用單個執行緒。 |
newScheduledThreadPool(int corePoolSize) | 這個方法返回一個固定大小的執行緒池,可以安排命令在給定的延遲後執行,或者定期執行 |
-
自定義
ThreadPoolExecutor
的建立執行緒池
提供預設建構函式
publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,
TimeUnitunit,BlockingQueueworkQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler);
引數 | 說明 |
---|---|
corePoolSize | 核心執行緒數 |
maximumPoolSize | 最大執行緒數 |
keepAliveTime | 執行緒保持存活的最大時間 |
unit | 時間單位 |
workQueue | 阻塞佇列 |
threadFactory | 執行緒工廠 |
handler | 拒絕策略 |
ThreadPoolExecutor原始碼分析
-
執行緒池內部狀態
privatefinalAtomicIntegerctl=newAtomicInteger(ctlOf(RUNNING,0));
privatestaticfinalintCOUNT_BITS=Integer.SIZE-3;
privatestaticfinalintCAPACITY=(1<<COUNT_BITS)-1;
//runStateisstoredinthehigh-orderbits
privatestaticfinalintRUNNING=-1<<COUNT_BITS;
privatestaticfinalintSHUTDOWN=0<<COUNT_BITS;
privatestaticfinalintSTOP=1<<COUNT_BITS;
privatestaticfinalintTIDYING=2<<COUNT_BITS;
privatestaticfinalintTERMINATED=3<<COUNT_BITS;
//獲取執行緒狀態
privatestaticintrunStateOf(intc){returnc&~CAPACITY;}
//獲取work執行緒數
privatestaticintworkerCountOf(intc){returnc&CAPACITY;}
//制定狀態&執行緒數獲取ctl值
privatestaticintctlOf(intrs,intwc){returnrs|wc;}
ctl變數利用低29位表示執行緒池中執行緒數,通過高3位表示執行緒池的執行狀態:
RUNNING
:-1 << COUNT_BITS,即高3位為111,該狀態的執行緒池會接收新任務,並處理阻塞佇列中的任務;SHUTDOWN
: 0 << COUNT_BITS,即高3位為000,該狀態的執行緒池不會接收新任務,但會處理阻塞佇列中的任務;STOP
: 1 << COUNT_BITS,即高3位為001,該狀態的執行緒不會接收新任務,也不會處理阻塞佇列中的任務,而且會中斷正在執行的任務;TIDYING
: 2 << COUNT_BITS,即高3位為010, 所有的任務都已經終止;TERMINATED
: 3 << COUNT_BITS,即高3位為011, terminated
狀態轉換圖
下面帶大家分析下ThreadPoolExecutor
內部幾個核心方法:
-
新增任務:execute(Runnable command)
執行Runnable入口方法
publicvoidexecute(Runnablecommand){
if(command==null)
thrownewNullPointerException();
intc=ctl.get();
//workerCountOf獲取執行緒池的當前執行緒數;小於corePoolSize,執行addWorker建立新執行緒執行command任務
if(workerCountOf(c)<corePoolSize){
if(addWorker(command,true))
return;
c=ctl.get();
}
//doublecheck:c,recheck
//執行緒池處於RUNNING狀態,把提交的任務成功放入阻塞佇列中
if(isRunning(c)&&workQueue.offer(command)){
intrecheck=ctl.get();
//回滾到入隊操作前,即倘若執行緒池shutdown狀態,就remove(command)
//如果執行緒池沒有RUNNING,成功從阻塞佇列中刪除任務,執行reject方法處理任務
if(!isRunning(recheck)&&remove(command))
reject(command);
//執行緒池處於running狀態,但是沒有執行緒,則建立執行緒去執行佇列的任務。
elseif(workerCountOf(recheck)==0)
addWorker(null,false);
}
//往執行緒池中建立新的執行緒失敗,則reject任務
elseif(!addWorker(command,false))
reject(command);
}
新增任務流程圖
-
新增工作佇列 addWorker(Runnable firstTask, boolean core)
我們接下來看看如何新增worker執行緒的
privatebooleanaddWorker(RunnablefirstTask,booleancore){
retry:
for(;;){
intc=ctl.get();//讀取ctl的值
intrs=runStateOf(c);//獲取執行緒池的執行狀態
/*判斷當前執行緒池還是否需要執行任務
*如果當前執行緒池的狀態為RUNNING態則不會返回false
*返回false的條件(大前提:當前執行緒池狀態不是RUNNING態),在此基礎下下面三個條件有任何一個不成立都會直接返回,而不新建工作執行緒:
*1.當前執行緒池的狀態為SHUTDOWN態
*2.所提交任務為null
*3.阻塞佇列非空
*/
if(rs>=SHUTDOWN&&
!(rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()))
returnfalse;
for(;;){
//獲取當前池中執行緒個數
intwc=workerCountOf(c);
/*
*若當前池中執行緒個數>=2的29次方減1,則無法建立新執行緒。池中最大執行緒數量為2的29次方減1個
*如果core為true則於核心先稱數量進行比較,否則與最大執行緒數量進行比較
*/
if(wc>=CAPACITY||
wc>=(core?corePoolSize:maximumPoolSize))
returnfalse;
//將workerCount的值加1,並跳出外層迴圈
if(compareAndIncrementWorkerCount(c))
breakretry;
//如果執行緒狀態被修改,則再次執行外層迴圈
c=ctl.get();
if(runStateOf(c)!=rs)
continueretry;
}
}
booleanworkerStarted=false;
booleanworkerAdded=false;
Workerw=null;
try{
/*
*此處建立Worker例項,並將任務firstTask設定進去
*注意Worker類中有兩個特殊的欄位:1.RunnablefirstTask2.finalThreadthread
*Worker類本身也繼承了Runnable介面,實現了其run()方法
*/
w=newWorker(firstTask);
//這裡的t是w本身表示的執行緒物件,而非firstTask。
finalThreadt=w.thread;
if(t!=null){
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try{
//獲取當前執行緒池的執行狀態rs
intrs=runStateOf(ctl.get());
/*
*rs<SHUTDOWN的狀態只有RUNNING態
*能進去下面if的條件:
*1.當前執行緒池執行狀態為RUNNING
*2.當前執行緒池狀態為SHUTDOWN而且firstTask為null
*/
if(rs<SHUTDOWN||
(rs==SHUTDOWN&&firstTask==null)){
if(t.isAlive())
thrownewIllegalThreadStateException();
//HashSet<Worker>workers執行緒池中利用HashSet儲存的worker物件
workers.add(w);
ints=workers.size();
//largestPoolSize用來記錄執行緒池中最大的執行緒數量
if(s>largestPoolSize)
largestPoolSize=s;
//任務新增成功(執行緒建立成功)
workerAdded=true;
}
}finally{
mainLock.unlock();
}
if(workerAdded){
//啟動工作執行緒,這裡呼叫的是Worker類中的run()方法
t.start();
workerStarted=true;
}
}
}finally{
if(!workerStarted)
addWorkerFailed(w);
}
return