1. 程式人生 > 實用技巧 >淺析執行緒池 ThreadPoolExecutor 原始碼

淺析執行緒池 ThreadPoolExecutor 原始碼

首先看下類的繼承關係,不多介紹:

public interface Executor {void execute(Runnable);}
public interface ExecutorService extends Executor {...} 
public abstract class AbstractExecutorService implements ExecutorService {...}
public class ThreadPoolExecutor extends AbstractExecutorService {...}

執行緒池構造器七大引數:
核心執行緒數,最大執行緒數,生存時間,時間單位,任務佇列,執行緒工廠,拒絕策略

public ThreadPoolExecutor(int corePoolSize,		//核心執行緒數
                          int maximumPoolSize,	        //最大執行緒數
                          long keepAliveTime,		//生存時間
                          TimeUnit unit,		//時間單位
                          BlockingQueue<Runnable> workQueue,	//任務佇列
                          ThreadFactory threadFactory,		//執行緒工廠
                          RejectedExecutionHandler handler)	//拒絕策略

先對執行緒池有個大概的概念:執行緒池,有若干個執行中的執行緒(工作者,Worker),負責從任務佇列(workQueue)中取任務(Task)出來,並執行它。

private final BlockingQueue<Runnable> workQueue;
private final HashSet<Worker> workers = new HashSet<Worker>();

這裡再大概介紹一下Worker類:
Worker類內部有兩個關鍵引用:執行緒Thread t、待執行任務Runnable firstTask
並且其自身就是Runnable

,其run()方法呼叫自身的runWorker()方法,稍後再來介紹runWorker()幹了啥。

回到執行緒池的使用:一般都是呼叫submit()或者execute()submit()只是把傳入的Runnable包裝成FutureTask來儲存執行結果,本質也是呼叫execute()方法。

因此我們主要分析execute()方法:

結合程式碼和註釋,可以得出其執行流程:public void execute(Runnable command)

  1. 不夠核心執行緒數的時候,起新執行緒(addWorker())
  2. 核心執行緒滿的時候把command放進workQueue佇列
  3. 核心執行緒和佇列都滿,不夠最大執行緒數的時候,起新執行緒
  4. 否則執行拒絕策略

其中最關鍵的當然是建立新執行緒執行任務的過程,addWorker()方法:

大概描述一下addWorker()的執行步驟:

  1. 雙重CAS把工作執行緒數加一
  2. new一個Worker w,並放入workers(HashSet)。
  3. 放入成功則執行w.t.start()(即會呼叫w.run()

其中,最初傳入的command作為wfirstTaskw.t是用執行緒工廠建立一個新執行緒,把w自己作為Runnable傳入。

w.run()方法直接執行runWorker()方法:

描述一下大概執行過程:

  1. 把 task 取出來:task = w.firstTask; w.firstTask = null;
  2. 首先執行 task ,然後迴圈從阻塞佇列 workQueue 中獲取一個 task 來執行
  3. 獲取不到任務時,結束執行。結束之前執行一些後續處理。

此外,有幾個小問題值得一提:

  • 非核心執行緒與核心執行緒的區別:

並沒有這種區別。從原始碼可以看到,addWorker()方法的引數boolean core並不會用於建立不同型別的Worker。只在新建Worker之前判斷“核心執行緒是否已滿”:core=true時,判斷工作執行緒數是否大於corePoolSize,是則返回false而不新建Workercore=false時,判斷工作執行緒數是否大於maximumPoolSize,是則返回false而不新建Worker

  • 那怎麼使得核心執行緒不被銷燬而非核心執行緒被銷燬呢?

可以看到,如果當前的工作執行緒數大於核心執行緒數,則從任務佇列中取任務的方法則從阻塞的take()方法換為超時等待keepAliveTime時長的poll()。當非核心執行緒閒置(任務佇列沒有任務)的時候,等待一會從getTask()方法返回null,於是執行緒結束。

其中allowCoreThreadTimeOut屬性指示keepAliveTime是否也會作用於核心執行緒。

並且,執行緒結束之前有“後續處理”:
可以看到,如果當前的工作執行緒數小於核心執行緒數,則新建一個沒有task的執行緒(等待任務佇列中的任務到來)。