1. 程式人生 > >ThreadPoolExecutor的應用和實現分析(中)—— 任務處理相關源碼分析

ThreadPoolExecutor的應用和實現分析(中)—— 任務處理相關源碼分析

stateless 自身 tran als row exce 繼承 break attribute

轉自:http://www.tuicool.com/articles/rmqYjq

前面一篇文章從Executors中的工廠方法入手,已經對ThreadPoolExecutor的構造和使用做了一些整理。而這篇文章,我們將接著前面的介紹, 從源碼實現上對ThreadPoolExecutor在任務的提交、執行,線程重用和線程數維護等方面做下分析。

0. ThreadPoolExecutor類的聲明屬性變量分析

public class ThreadPoolExecutor extends AbstractExecutorService

從這個類聲明中我們可以看到java.util.ThreadPoolExecutor是繼承於AbstractExecutorService的,而之前的文章我也提到過,AbstractExecutorService已經實現了一些任務提交處理的方法,如submit()方法都是在這個抽象類中實現的。但submit()方法,最後也是會調用ThreadPoolExecutor的execute()方法。

打開SunJDK中的ThreadPoolExecutor類源碼,除了上篇文章提到的一些和構造方法中參數對應的屬性之外,讓我們看看還有什麽:

  • mainLock 對整個ThreadPoolExecutor對象的鎖
  • workers 存儲工作線程對應Worker對象的HashSet
  • termination 線程池ThreadPoolExecutor對象的生命周期終止條件,和mainLock相關
  • largestPoolSize 線程池跑過的最大線程數
  • completedTaskCount 完成任務數
  • ctl 執行器ThreadPoolExecutor的生命周期狀態和活動狀態的worker數封裝

稍微需要說一下最後一個, ctl是一個AtomicInteger對象,以位運算的方式打包封裝了當前線程池ThreadPoolExecutor對象的狀態和活動線程數兩個數據

1. 執行器狀態

ExecutorService中已經指定了這個接口對應的類要實現的方法,其中就包括shutdown()和shutdownNow()等方法。在ThreadPoolExecutor中指明了狀態的含義,並包含其於ctl屬性中。

ThreadPoolExecutor對象有五種狀態,如下:

  • RUNNING 在ThreadPoolExecutor被實例化的時候就是這個狀態
  • SHUTDOWN 通常是已經執行過shutdown()方法,不再接受新任務,等待線程池中和隊列中任務完成
  • STOP 通常是已經執行過shutdownNow()方法,不接受新任務,隊列中的任務也不再執行,並嘗試終止線程池中的線程
  • TIDYING 線程池為空,就會到達這個狀態,執行terminated()方法
  • TERMINATED terminated()執行完畢,就會到達這個狀態,ThreadPoolExecutor終結

2. Worker內部類

它既實現了Runnable,同時也是一個AQS ( AbstractQueuedSynchronizer )。

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable

封裝了3樣東西,Runnable類的首個任務對象,執行的線程thread和完成的任務數(volatile)completedTasks。

final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

這個類還提供了interruptIfStarted()這樣一個方法,裏面做了(getState()>= 0)的判斷。與此呼應,Worker的構造方法裏對state設置了-1,避免在線程執行前被停掉。

Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

3. 提交任務

上篇文章已經提到了,提交新任務的時候,如果沒達到核心線程數corePoolSize,則開辟新線程執行。如果達到核心線程數corePoolSize, 而隊列未滿,則放入隊列,否則開新線程處理任務,直到maximumPoolSize,超出則丟棄處理。

這段源碼邏輯如下,不細說了。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

4. addWorker()的實現

在上面提交任務的時候,會出現開辟新的線程來執行,這會調用addWorker()方法。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

代碼較長,我們可以分兩大部分看:

第一段從第3行到第26行,是雙層無限循環,嘗試增加線程數到ctl變量,並且做一些比較判斷,如果超出線程數限定或者ThreadPoolExecutor的狀態不符合要求,則直接返回false,增加worker失敗。

第二段從第28行開始到結尾,把firstTask這個Runnable對象傳給Worker構造方法,賦值給Worker對象的task屬性。Worker對象把自身(也是一個Runnable)封裝成一個Thread對象賦予Worker對象的thread屬性。鎖住整個線程池並實際增加worker到workers的HashSet對象當中。成功增加後開始執行t.start(),就是worker的thread屬性開始運行,實際上就是運行Worker對象的run方法。Worker的run()方法實際上調用了ThreadPoolExecutor的runWorker()方法。

5. 任務的執行runWorker()

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

這段代碼實際上就是執行提交給線程池執行的Runnable任務的實際內容。其中,值得註意的有以下幾點:

  • 線程開始執行前,需要對worker加鎖,完成一個任務後執行unlock()
  • 在任務執行前後,執行beforeExecute()和afterExecute()方法
  • 記錄任務執行中的異常後,繼續拋出
  • 每個任務完成後,會記錄當前線程完成的任務數
  • 當worker執行完一個任務的時候,包括初始任務firstTask,會調用getTask()繼續獲取任務,這個方法調用是可以阻塞的
  • 線程退出,執行processWorkerExit(w, completedAbruptly)處理

5. Worker線程的復用和任務的獲取getTask()

在上一段代碼中,也就是runWorker()方法,任務的執行過程是嵌套在while循環語句塊中的。每當一個任務執行完畢,會從頭開始做下一次循環執行,實現了空閑線程的復用。而要執行的任務則是來自於getTask()方法:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?

            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                if (wc <= maximumPoolSize && ! (timedOut && timed))
                     break;
                if (compareAndDecrementWorkerCount(c))
                     return null;
                c = ctl.get();
                // Re-read ctl
                if (runStateOf(c) != rs)
                     continue retry;
                // else CAS failed due to workerCount change; retry inner loop
             }
             try {
                 Runnable r = timed ?
                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                     workQueue.take();
                 if (r != null)
                     return r;
                 timedOut = true;
             } catch (InterruptedException retry) {
                 timedOut = false;
             }
         }
     }

getTask()實際上是從工作隊列(workQueue)中取提交進來的任務。這個workQueue是一個BlockingQueue,通常當隊列中沒有新任務的時候,則getTask()會阻塞。另外,還有定時阻塞這樣一段邏輯:如果從隊列中取任務是計時的,則用poll()方法,並設置等待時間為keepAlive,否則調用阻塞方法take()。當poll()超時,則獲取到的任務為null,timeOut設置為 true。這段代碼也是放在一個for(;;)循環中,前面有判斷超時的語句,如果超時,則return null。這意味著runWorker()方法的while循環結束,線程將退出,執行processWorkerExit()方法。

回頭看看是否計時是如何確定的。

int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc &gt; corePoolSize;

即判斷當前線程池的線程數是否超出corePoolSize,如果超出這個值並且空閑時間多於keepAlive則當前線程退出。

另外一種情況就是allowCoreThreadTimeOut為true,就是允許核心在空閑超時的情況下停掉。

6. 線程池線程數的維護和線程的退出處理

剛剛也提到了,我們再看下processWorkerExit()方法。這個方法最主要就是從workers的Set中remove掉一個多余的線程。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
         if (completedAbruptly) // If abrupt, then workerCount wasn‘t adjusted
             decrementWorkerCount();
         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();
         try {
             completedTaskCount += w.completedTasks;
             workers.remove(w);
         } finally {
             mainLock.unlock();
         }
         tryTerminate();
         int c = ctl.get();
         if (runStateLessThan(c, STOP)) {
             if (!completedAbruptly) {
                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                 if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                 if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

這個方法的第二個參數是判斷是否在runWorker()中正常退出了循環向下執行,如果不是,說明在執行任務的過程中出現了異常,completedAbruptly為true,線程直接退出,需要直接對活動線程數減1 。

之後,加鎖統計完成的任務數,並從workers這個集合中移除當前worker。

執行tryTerminate(),這個方法後面會詳細說,主要就是嘗試將線程池推向TERMINATED狀態。

最後比較當前線程數是不是已經低於應有的線程數,如果這個情況發生,則添加無任務的空Worker到線程池中待命。

以上,增加新的線程和剔除多余的線程的過程大概就是如此,這樣線程池能保持額定的線程數,並彈性伸縮,保證系統的資源不至於過度消耗。

ThreadPoolExecutor的應用和實現分析(中)—— 任務處理相關源碼分析