1. 程式人生 > 其它 >全面講解程序池原理

全面講解程序池原理

執行緒池(Executor)

什麼是執行緒池?

Java5引入了新的稱為Executor框架的併發API,以簡化程式設計師的工作。它簡化了多執行緒應用程式的設計和開發。它主要由ExecutorExecutorService介面和ThreadPoolExecutor類組成,ThreadPoolExecutor類同時實現ExecutorExecutorService介面。ThreadPoolExecutor類提供執行緒池的實現。我們將在教程的後面部分了解更多。

執行緒池繼承關係圖

為什麼我們需要執行緒池?

當我們建立一個簡單的多執行緒應用程式時,我們建立Runnable物件,並使用Runnable

構造執行緒物件,我們需要建立、執行和管理執行緒。我們可能很難做到這一點。Executor框架為您做這件事。它負責建立、執行和管理執行緒,不僅如此,它還提高了應用程式的效能。

當您為每個任務建立一個新執行緒,然後如果系統高度過載,您將出現記憶體不足錯誤,系統將失敗,甚至丟擲oom異常。如果使用ThreadPoolExecutor,則不會為新任務建立執行緒。將任務分配給有限數量的執行緒只去執行Runnable,一旦執行緒完成一個任務,他將會去阻塞佇列中獲取Runnable去執行。

如何建立執行緒池?


publicinterfaceExecutor{
voidexecute(Runnablecommand);
}

還有另一個名為ExecutorService的介面,它擴充套件了Executor介面。它可以被稱為Executor,它提供了可以控制終止的方法和可以生成未來跟蹤一個或多個非同步任務進度的方法。它有提交、關機、立即關機等方法。

ThreadPoolExecutor是ThreadPool的實際實現。它擴充套件了實現ExecutorService介面的AbstractThreadPoolExecutor。可以從Executor類的工廠方法建立ThreadPoolExecutor。建議使用一種方法獲取ThreadPoolExecutor的例項。

  • 使用Executors工廠方法去建立執行緒池:

提供預設靜態方法

Executors類中有4個工廠方法可用於獲取ThreadPoolExecutor的例項。我們正在使用Executors的newFixedThreadPool獲取ThreadPoolExecutor的一個例項。

ThreadPoolExecutorthreadPoolExecutor=(ThreadPoolExecutor)Executors.newFixedThreadPool(5);
方法說明
newFixedThreadPool(int nThreads) 此方法返回執行緒池執行器,其最大大小(例如n個執行緒)是固定的
newCachedThreadPool() 此方法返回一個無限執行緒池。
newSingleThreadedExecutor() 此方法返回一個執行緒執行器,該執行器保證使用單個執行緒。
newScheduledThreadPool(int corePoolSize) 這個方法返回一個固定大小的執行緒池,可以安排命令在給定的延遲後執行,或者定期執行
  • 自定義ThreadPoolExecutor的建立執行緒池

提供預設建構函式

publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,
TimeUnitunit,BlockingQueueworkQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler);
引數說明
corePoolSize 核心執行緒數
maximumPoolSize 最大執行緒數
keepAliveTime 執行緒保持存活的最大時間
unit 時間單位
workQueue 阻塞佇列
threadFactory 執行緒工廠
handler 拒絕策略

ThreadPoolExecutor原始碼分析

  • 執行緒池內部狀態

privatefinalAtomicIntegerctl=newAtomicInteger(ctlOf(RUNNING,0));
privatestaticfinalintCOUNT_BITS=Integer.SIZE-3;
privatestaticfinalintCAPACITY=(1<<COUNT_BITS)-1;
//runStateisstoredinthehigh-orderbits
privatestaticfinalintRUNNING=-1<<COUNT_BITS;
privatestaticfinalintSHUTDOWN=0<<COUNT_BITS;
privatestaticfinalintSTOP=1<<COUNT_BITS;
privatestaticfinalintTIDYING=2<<COUNT_BITS;
privatestaticfinalintTERMINATED=3<<COUNT_BITS;

//獲取執行緒狀態
privatestaticintrunStateOf(intc){returnc&~CAPACITY;}
//獲取work執行緒數
privatestaticintworkerCountOf(intc){returnc&CAPACITY;}
//制定狀態&執行緒數獲取ctl值
privatestaticintctlOf(intrs,intwc){returnrs|wc;}

ctl變數利用低29位表示執行緒池中執行緒數,通過高3位表示執行緒池的執行狀態:

  • RUNNING:-1 << COUNT_BITS,即高3位為111,該狀態的執行緒池會接收新任務,並處理阻塞佇列中的任務;
  • SHUTDOWN: 0 << COUNT_BITS,即高3位為000,該狀態的執行緒池不會接收新任務,但會處理阻塞佇列中的任務;
  • STOP: 1 << COUNT_BITS,即高3位為001,該狀態的執行緒不會接收新任務,也不會處理阻塞佇列中的任務,而且會中斷正在執行的任務;
  • TIDYING: 2 << COUNT_BITS,即高3位為010, 所有的任務都已經終止;
  • TERMINATED: 3 << COUNT_BITS,即高3位為011, terminated

狀態轉換圖

下面帶大家分析下ThreadPoolExecutor內部幾個核心方法:

  • 新增任務:execute(Runnable command)

執行Runnable入口方法

publicvoidexecute(Runnablecommand){
if(command==null)
thrownewNullPointerException();
intc=ctl.get();
//workerCountOf獲取執行緒池的當前執行緒數;小於corePoolSize,執行addWorker建立新執行緒執行command任務
if(workerCountOf(c)<corePoolSize){
if(addWorker(command,true))
return;
c=ctl.get();
}
//doublecheck:c,recheck
//執行緒池處於RUNNING狀態,把提交的任務成功放入阻塞佇列中
if(isRunning(c)&&workQueue.offer(command)){
intrecheck=ctl.get();
//回滾到入隊操作前,即倘若執行緒池shutdown狀態,就remove(command)
//如果執行緒池沒有RUNNING,成功從阻塞佇列中刪除任務,執行reject方法處理任務
if(!isRunning(recheck)&&remove(command))
reject(command);
//執行緒池處於running狀態,但是沒有執行緒,則建立執行緒去執行佇列的任務。
elseif(workerCountOf(recheck)==0)
addWorker(null,false);
}
//往執行緒池中建立新的執行緒失敗,則reject任務
elseif(!addWorker(command,false))
reject(command);
}

新增任務流程圖

  • 新增工作佇列 addWorker(Runnable firstTask, boolean core)

我們接下來看看如何新增worker執行緒的

privatebooleanaddWorker(RunnablefirstTask,booleancore){
retry:
for(;;){
intc=ctl.get();//讀取ctl的值
intrs=runStateOf(c);//獲取執行緒池的執行狀態

/*判斷當前執行緒池還是否需要執行任務
*如果當前執行緒池的狀態為RUNNING態則不會返回false
*返回false的條件(大前提:當前執行緒池狀態不是RUNNING態),在此基礎下下面三個條件有任何一個不成立都會直接返回,而不新建工作執行緒:
*1.當前執行緒池的狀態為SHUTDOWN態
*2.所提交任務為null
*3.阻塞佇列非空
*/
if(rs>=SHUTDOWN&&
!(rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()))
returnfalse;

for(;;){
//獲取當前池中執行緒個數
intwc=workerCountOf(c);
/*
*若當前池中執行緒個數>=2的29次方減1,則無法建立新執行緒。池中最大執行緒數量為2的29次方減1個
*如果core為true則於核心先稱數量進行比較,否則與最大執行緒數量進行比較
*/
if(wc>=CAPACITY||
wc>=(core?corePoolSize:maximumPoolSize))
returnfalse;
//將workerCount的值加1,並跳出外層迴圈
if(compareAndIncrementWorkerCount(c))
breakretry;

//如果執行緒狀態被修改,則再次執行外層迴圈
c=ctl.get();
if(runStateOf(c)!=rs)
continueretry;
}
}

booleanworkerStarted=false;
booleanworkerAdded=false;
Workerw=null;
try{
/*
*此處建立Worker例項,並將任務firstTask設定進去
*注意Worker類中有兩個特殊的欄位:1.RunnablefirstTask2.finalThreadthread
*Worker類本身也繼承了Runnable介面,實現了其run()方法
*/
w=newWorker(firstTask);
//這裡的t是w本身表示的執行緒物件,而非firstTask。
finalThreadt=w.thread;
if(t!=null){
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try{
//獲取當前執行緒池的執行狀態rs
intrs=runStateOf(ctl.get());

/*
*rs<SHUTDOWN的狀態只有RUNNING態
*能進去下面if的條件:
*1.當前執行緒池執行狀態為RUNNING
*2.當前執行緒池狀態為SHUTDOWN而且firstTask為null
*/
if(rs<SHUTDOWN||
(rs==SHUTDOWN&&firstTask==null)){
if(t.isAlive())
thrownewIllegalThreadStateException();
//HashSet<Worker>workers執行緒池中利用HashSet儲存的worker物件
workers.add(w);
ints=workers.size();
//largestPoolSize用來記錄執行緒池中最大的執行緒數量
if(s>largestPoolSize)
largestPoolSize=s;
//任務新增成功(執行緒建立成功)
workerAdded=true;
}
}finally{
mainLock.unlock();
}
if(workerAdded){
//啟動工作執行緒,這裡呼叫的是Worker類中的run()方法
t.start();
workerStarted=true;
}
}
}finally{
if(!workerStarted)
addWorkerFailed(w);
}

returnworkerStarted;
}
  • 執行任務: runWorker(Worker w)

在addWorker成功後會呼叫Worker的start()方法,接下來來分析下如何執行任務的。

finalvoidrunWorker(Workerw){
//獲取當前執行的執行緒物件
Threadwt=Thread.currentThread();
//獲取第一個任務
Runnabletask=w.firstTask;
w.firstTask=null;
w.unlock();//允許中斷
booleancompletedAbruptly=true;
try{
//task任務不為空或者getTask()獲取任務不為空時候進入迴圈
while(task!=null||(task=getTask())!=null){
w.lock();
//如果執行緒狀態>STOP或者當前執行緒被中斷時候這時候呼叫wt.interrupt()去中斷worker執行緒
if((runStateAtLeast(ctl.get(),STOP)||
(Thread.interrupted()&&runStateAtLeast(ctl.get(),STOP)))&&!wt.isInterrupted())
wt.interrupt();
try{
//在ThreadPoolExecutor中該方法是一個空方法
beforeExecute(wt,task);
Throwablethrown=null;
try{
//執行任務。
task.run();
}catch(RuntimeExceptionx){
thrown=x;throwx;
}catch(Errorx){
thrown=x;throwx;
}catch(Throwablex){
thrown=x;thrownewError(x);
}finally{
afterExecute(task,thrown);
}
}finally{
task=null;
//任務計數器加1
w.completedTasks++;
//釋放鎖
w.unlock();
}
}
//如果執行任務的過程中沒有發生異常,則completedAbruptly會被賦值為false
completedAbruptly=false;
}finally{
processWorkerExit(w,completedAbruptly);
}
}

看到這裡我們還沒看到當worker執行緒數>coreSize時候是如何去回收執行緒的,不用著急,接下來我們去看下getTask()方法。

  • 獲取task任務: getTask()

privateRunnablegetTask(){
booleantimedOut=false;

for(;;){
intc=ctl.get();
intrs=runStateOf(c);

/*
*若當前執行緒池的工作狀態為RUNNING則不會進入下面if。
*1.若狀態為STOP、TIDYING、TERMINATED則當前工作執行緒不能執行任務。
*2.若狀態為SHUTDOWN,且阻塞佇列為空,則獲取任務為null
*/
if(rs>=SHUTDOWN&&(rs>=STOP||workQueue.isEmpty())){
//workerCount的值減1
decrementWorkerCount();
returnnull;
}
//獲取工作執行緒數量
intwc=workerCountOf(c);

//若allowCoreThreadTimeOut設定為true或者當前池中工作執行緒數量大於核心執行緒數量則timed為true
booleantimed=allowCoreThreadTimeOut||wc>corePoolSize;

//若當前工作執行緒數量已經超過最大執行緒數量,則也獲取不到任務,會從該方法中返回null,進而結束該工作執行緒
if((wc>maximumPoolSize||(timed&&timedOut))&&(wc>1||workQueue.isEmpty())){
if(compareAndDecrementWorkerCount(c))
returnnull;
continue;
}

try{
/*
*若allowCoreThreadTimeOut設定為true或者當前池中工作執行緒數量大於核心執行緒數量
*則:在指定的時間內從阻塞佇列中獲取任務,若取不到則返回null
*若allowCoreThreadTimeOut設定為false而且當前池中工作執行緒數量小於核心執行緒數量
*則:在指定的時間內從阻塞佇列中獲取任務,若取不到則一直阻塞
*/
Runnabler=timed?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):workQueue.take();
if(r!=null)
returnr;
//若r==null,則此處timedOut的值被設定為true
timedOut=true;
}catch(InterruptedExceptionretry){
//如果阻塞等待過程中執行緒發生中斷,則將timeOut設定為false,進入下一次迴圈
timedOut=false;
}
}
  • 關閉執行緒: shutdown()

publicvoidshutdown(){
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try{
//檢測是否有關閉執行緒池的許可權
checkShutdownAccess();
//將執行緒池狀態設定為SHUTDOWN態
advanceRunState(SHUTDOWN);
//中斷空閒執行緒(沒有執行任務的執行緒)
interruptIdleWorkers();
//該方法在ThreadPoolExecutor中是一個空方法
onShutdown();
}finally{
mainLock.unlock();
}
//嘗試將執行緒池狀態設定為TERMINATED狀態。
tryTerminate();
  • 立即關閉執行緒: shutdownNow()

此方法會中斷任務執行,返回未執行的task

publicList<Runnable>shutdownNow(){
List<Runnable>tasks;
//加鎖
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try{
checkShutdownAccess();
//直接設定STOP狀態
advanceRunState(STOP);
interruptWorkers();
//丟棄未執行的task,返回
tasks=drainQueue();
}finally{
mainLock.unlock();
}
tryTerminate();
returntasks;
}

執行緒池使用注意事項

  • 使用ThreadLocal

ThreadLocal 稱為執行緒本地儲存,一般作為靜態域使用,它為每一個使用它的執行緒提供一個其值(value)的副本。通常對資料庫連線(Connection)和事務(Transaction)使用執行緒本地儲存。 可以簡單地將 ThreadLocal 理解成一個容器,它將 value 物件儲存在 Map<Thread, T> 域中,即使用當前執行緒為 key 的一個 Map,ThreadLocal 的 get() 方法從 Map 裡取與當前執行緒相關聯的 value 物件。ThreadLocal 的真正實現並不是這樣的,但是可以簡單地這樣理解。執行緒池中的執行緒在任務執行完成後會被複用,所以線上程執行完成時,要對 ThreadLocal 進行清理(清除掉與本執行緒相關聯的 value 物件)。不然,被複用的執行緒去執行新的任務時會使用被上一個執行緒操作過的 value 物件,從而產生不符合預期的結果。

  • 設定合理的執行緒數

新手可能對使用執行緒池有一個誤區,併發越高使用更多執行緒數,然而實際的情況就是過多的執行緒會造成系統大量的Context-Switch從而影響系統的吞吐量,所以合理的執行緒數需要結合專案進行壓測,一般我們主要針對2種類型的任務設定執行緒數規則為:

  1. cpu密集型

    coreSize == cpu核心數+1

  2. Io密集型

    coreSize == 2*cpu核心數

結束