JAVA執行緒池(ThreadPoolExecutor)原始碼分析
阿新 • • 發佈:2019-02-03
中使用ThreadPoolExecutor的常用方式:
例項程式碼1
Java程式碼
在分析ThreadPoolExecutor原始碼前,先了解下面兩個概念:
1.核心執行緒(任務): 我們定義的執行緒,即實現了Runnable介面的類,是我們將要放到執行緒池中執行的類,如例項程式碼中的CountService類
2.工作執行緒:由執行緒池中建立的執行緒,是用來獲得核心執行緒並執行核心執行緒的執行緒(比較拗口哦,具體看程式碼就知道是什麼東東了)。
Executors是一個執行緒池工廠,各種型別的執行緒池都是通過它來建立的,注意把它和Executor分開,感覺這個執行緒池工廠命名有點問題。
我們主要分析下我們提交任務的處理邏輯,即’execute.submit(runnable)’的實現。
Submit()方法是在ThreadPoolExecutor繼承的抽象類AbstractExecutorService中實現的,具體程式碼如下:
Java程式碼
從程式碼中可以看出,執行緒的執行邏輯通過execute()完成,而execute是在AbstractExecutorService的子類ThreadPoolExecutor中實現的。看,一個典型的模板模式!廢話少說,下面看ThreadPoolExecutor中execute()方法中程式碼:
Java程式碼
注意:CachedThreadPool和FixedThreadPool的邏輯實現都是在ThreadPoolExecutor中實現的。它兩的主要區別就是屬性corePoolSize以及workQueue的初始值的不同。具體可自己檢視工程類Executors的newFixedThreadPool()和newCachedThreadPool方法。由於這些初始值的不同,所以實現的邏輯也不同,具體的我在程式碼中已經註釋了。
command執行緒執行的整個邏輯在 addIfUnderCorePoolSize(command)方法中實現的,
詳細請看addIfUnderCorePoolSize(command)原始碼:
Java程式碼
看’t.start()’,這表示工作執行緒啟動了,工作執行緒t啟動的前提條件是’t = addThread(firstTask); ‘返回值t必須不為null。好了,現在想看看java執行緒池中工作執行緒是怎麼樣的嗎?請看addThread方法:
Java程式碼
看見沒,Worker就是工作執行緒類,它是ThreadPoolExecutor中的一個內部類。下面,我們主要分析Worker類,如瞭解了Worker類,那基本就瞭解了java執行緒池的整個原理了。不用怕,Worker類的邏輯很簡單,它其實就是一個執行緒,實現了Runnable介面的,所以,我們先從run方法入手,run方法原始碼如下:
Java程式碼
從原始碼中可看出,我們所提交的核心執行緒(任務)的邏輯是在Worker中的runTask()方法中實現的。這個方法很簡單,自己可以開啟看看。這裡要注意一點,在runTask()方法中執行核心執行緒時是呼叫核心執行緒的run()方法,這是一個尋常方法的呼叫,千萬別與執行緒的啟動(start())混合了。這裡還有一個比較重要的方法,那就是上述程式碼中while迴圈中的getTask()方法,它是一個從池佇列中取的核心執行緒(任務)的方法。具體程式碼如下:
Java程式碼
從這個方法中,我們需要了解一下幾點:
1.CachedThreadPool獲得任務邏輯是條件1,條件1的處理邏輯請看註釋,CachedThreadPool執行條件1的原因是:CachedThreadPool的corePoolSize時刻為0。
2.FixedThreadPool執行的邏輯為條件2,從’workQueue.take()’中我們就明白了為什麼FixedThreadPool不會釋放工作執行緒的原因了(除非你關閉執行緒池)。
最後,我們瞭解下Worker(工作執行緒)終止時的處理吧,這個對理解CachedThreadPool有幫助,具體程式碼如下:
Java程式碼
注意workDone()方法中的tyrTerminate()方法,它是你以後理解執行緒池中shuDown()以及CachedThreadPool原理的關鍵,具體程式碼如下:
Java程式碼
第一次寫這麼長的博文,還是躲著專案經理寫的,真不容易,希望能對想了解java執行緒池原理的朋友們有一點幫助。
例項程式碼1
Java程式碼
- Runnable runnable = new CountService(intArr);
- ThreadPoolExecutor execute = (ThreadPoolExecutor)Executors.newFixedThreadPool(10);
- //或者使用:ThreadPoolExecutor execute = (ThreadPoolExecutor)Executors.newCachedThreadPool();
- execute.submit(runnable);
在分析ThreadPoolExecutor原始碼前,先了解下面兩個概念:
1.核心執行緒(任務):
2.工作執行緒:由執行緒池中建立的執行緒,是用來獲得核心執行緒並執行核心執行緒的執行緒(比較拗口哦,具體看程式碼就知道是什麼東東了)。
Executors是一個執行緒池工廠,各種型別的執行緒池都是通過它來建立的,注意把它和Executor分開,感覺這個執行緒池工廠命名有點問題。
我們主要分析下我們提交任務的處理邏輯,即’execute.submit(runnable)’的實現。
Submit()方法是在ThreadPoolExecutor繼承的抽象類AbstractExecutorService中實現的,具體程式碼如下:
Java程式碼
- public Future<?> submit(Runnable task) {
- if (task == null) thrownew NullPointerException();
- //對核心執行緒的一個包裝,RunnableFuture還是一個Runnable
- RunnableFuture<Object> ftask = newTaskFor(task, null);
- //核心執行緒執行邏輯
- execute(ftask);
- return ftask;
- }
從程式碼中可以看出,執行緒的執行邏輯通過execute()完成,而execute是在AbstractExecutorService的子類ThreadPoolExecutor中實現的。看,一個典型的模板模式!廢話少說,下面看ThreadPoolExecutor中execute()方法中程式碼:
Java程式碼
- publicvoid execute(Runnable command) {
- if (command == null)
- thrownew NullPointerException();
- /*
- * command執行緒執行的整個邏輯在 addIfUnderCorePoolSize(command)方法中實現
- * 一般適用於FixedThreadPool
- */
- if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
- /*
- * poolSize >= corePoolSize條件成立情景:當建立的為CacheThreadPool時,條件
- * 就能成立
- */
- if (runState == RUNNING && workQueue.offer(command)) {
- if (runState != RUNNING || poolSize == 0)
- //兩種情況下執行該方法:1.執行緒池shutdown 2.CacheThreadPool中第一個核心執行緒的執行
- ensureQueuedTaskHandled(command);
- }
- //CacheThreadPool中執行緒的執行邏輯
- elseif (!addIfUnderMaximumPoolSize(command))
- reject(command); // is shutdown or saturated
- }
- }
注意:CachedThreadPool和FixedThreadPool的邏輯實現都是在ThreadPoolExecutor中實現的。它兩的主要區別就是屬性corePoolSize以及workQueue的初始值的不同。具體可自己檢視工程類Executors的newFixedThreadPool()和newCachedThreadPool方法。由於這些初始值的不同,所以實現的邏輯也不同,具體的我在程式碼中已經註釋了。
command執行緒執行的整個邏輯在 addIfUnderCorePoolSize(command)方法中實現的,
詳細請看addIfUnderCorePoolSize(command)原始碼:
Java程式碼
- privateboolean addIfUnderCorePoolSize(Runnable firstTask) {
- Thread t = null;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- //poolSize < corePoolSize 即當前工作執行緒的數量一定要小於你設定的執行緒最大數量
- //CachedThreadPool永遠也不會進入該方法,因為它的corePoolSize初始為0
- if (poolSize < corePoolSize && runState == RUNNING)
- t = addThread(firstTask);
- } finally {
- mainLock.unlock();
- }
- if (t == null)
- returnfalse;
- t.start(); //執行緒執行了
- returntrue;
- }
看’t.start()’,這表示工作執行緒啟動了,工作執行緒t啟動的前提條件是’t = addThread(firstTask); ‘返回值t必須不為null。好了,現在想看看java執行緒池中工作執行緒是怎麼樣的嗎?請看addThread方法:
Java程式碼
- private Thread addThread(Runnable firstTask) {
- //Worker就是典型的工作執行緒,所以的核心執行緒都在工作執行緒中執行
- Worker w = new Worker(firstTask);
- //採用預設的執行緒工廠生產出一執行緒。注意就是設定一些執行緒的預設屬性,如優先順序、是否為後臺執行緒等
- Thread t = threadFactory.newThread(w);
- if (t != null) {
- w.thread = t;
- workers.add(w);
- //沒生成一個工作執行緒 poolSize加1,但poolSize等於最大執行緒數corePoolSize時,則不能再生成工作執行緒
- int nt = ++poolSize;
- if (nt > largestPoolSize)
- largestPoolSize = nt;
- }
- return t;
- }
看見沒,Worker就是工作執行緒類,它是ThreadPoolExecutor中的一個內部類。下面,我們主要分析Worker類,如瞭解了Worker類,那基本就瞭解了java執行緒池的整個原理了。不用怕,Worker類的邏輯很簡單,它其實就是一個執行緒,實現了Runnable介面的,所以,我們先從run方法入手,run方法原始碼如下:
Java程式碼
- publicvoid run() {
- try {
- Runnable task = firstTask;
- firstTask = null;
- /**
- * 注意這段while迴圈的執行邏輯,沒執行完一個核心執行緒後,就會去執行緒池
- * 佇列中取下一個核心執行緒,如取出的核心執行緒為null,則當前工作執行緒終止
- */
- while (task != null || (task = getTask()) != null) {
- runTask(task); //你所提交的核心執行緒(任務)的執行邏輯
- task = null;
- }
- } finally {
- workerDone(this); // 當前工作執行緒退出
- }
- }
- }
從原始碼中可看出,我們所提交的核心執行緒(任務)的邏輯是在Worker中的runTask()方法中實現的。這個方法很簡單,自己可以開啟看看。這裡要注意一點,在runTask()方法中執行核心執行緒時是呼叫核心執行緒的run()方法,這是一個尋常方法的呼叫,千萬別與執行緒的啟動(start())混合了。這裡還有一個比較重要的方法,那就是上述程式碼中while迴圈中的getTask()方法,它是一個從池佇列中取的核心執行緒(任務)的方法。具體程式碼如下:
Java程式碼
- Runnable getTask() {
- for (;;) {
- try {
- int state = runState;
- if (state > SHUTDOWN)
- returnnull;
- Runnable r;
- if (state == SHUTDOWN) //幫助清空佇列
- r = workQueue.poll();
- /*
- * 對於條件1,如果可以超時,則在等待keepAliveTime時間後,則返回一null物件,這時就
- * 銷燬該工作執行緒,這就是CachedThreadPool為什麼能回收空閒執行緒的原因了。
- * 注意以下幾點:1.這種功能情況一般不可能在fixedThreadPool中出現
- * 2.在使用CachedThreadPool時,條件1一般總是成立,因為CachedThreadPool的corePoolSize
- * 初始為0
- */
- elseif (poolSize > corePoolSize || allowCoreThreadTimeOut) //------------------條件1
- r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
- else
- r = workQueue.take(); //如果佇列不存在任何元素 則一直等待。 FiexedThreadPool典型模式----------條件2
- if (r != null)
- return r;
- if (workerCanExit()) { //--------------------------條件3
- if (runState >= SHUTDOWN) // Wake up others
- interruptIdleWorkers();
- returnnull;
- }
- // Else retry
- } catch (InterruptedException ie) {
- // On interruption, re-check runState
- }
- }
- }
從這個方法中,我們需要了解一下幾點:
1.CachedThreadPool獲得任務邏輯是條件1,條件1的處理邏輯請看註釋,CachedThreadPool執行條件1的原因是:CachedThreadPool的corePoolSize時刻為0。
2.FixedThreadPool執行的邏輯為條件2,從’workQueue.take()’中我們就明白了為什麼FixedThreadPool不會釋放工作執行緒的原因了(除非你關閉執行緒池)。
最後,我們瞭解下Worker(工作執行緒)終止時的處理吧,這個對理解CachedThreadPool有幫助,具體程式碼如下:
Java程式碼
- /**
- * 工作執行緒退出要處理的邏輯
- * @param w
- */
- void workerDone(Worker w) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- completedTaskCount += w.completedTasks;
- workers.remove(w); //從工作執行緒快取中刪除
- if (--poolSize == 0) //poolSize減一,這時其實又可以建立工作執行緒了
- tryTerminate(); //嘗試終止
- } finally {
- mainLock.unlock();
- }
- }
注意workDone()方法中的tyrTerminate()方法,它是你以後理解執行緒池中shuDown()以及CachedThreadPool原理的關鍵,具體程式碼如下:
Java程式碼
- privatevoid tryTerminate() {
- //終止的前提條件就是執行緒池裡已經沒有工作執行緒(Worker)了
- if (poolSize == 0) {
- int state = runState;
- /**
- * 如果當前已經沒有了工作執行緒(Worker),但是執行緒佇列裡還有等待的執行緒任務,則建立一個
- * 工作執行緒來執行執行緒佇列中等待的任務
- */
- if (state < STOP && !workQueue.isEmpty()) {
- state = RUNNING; // disable termination check below
- Thread t = addThread(null);
- if (t != null)
- t.start();
- }
- //設定池狀態為終止狀態
- if (state == STOP || state == SHUTDOWN) {
- runState = TERMINATED;
- termination.signalAll();
- terminated();
- }
- }
- }
第一次寫這麼長的博文,還是躲著專案經理寫的,真不容易,希望能對想了解java執行緒池原理的朋友們有一點幫助。