1. 程式人生 > >Java執行緒池execute()方法原始碼解析

Java執行緒池execute()方法原始碼解析

先看作者給出的註釋來理解執行緒池到底有什麼作用

* Thread pools address two different problems: they usually* provide improved performance when executing large numbers of* asynchronous tasks, due to reduced per-task invocation overhead,* and they provide a means of bounding and managing the resources,* including threads, consumed when executing a collection of tasks.

* Each {@code ThreadPoolExecutor} also maintains some basic* statistics, such as the number of completed tasks.

執行緒池處理了兩個不同的問題,執行緒池通過減少執行緒正式呼叫之前的開銷來給大量非同步任務更優秀的表現,與此同時給出了一系列繫結管理任務執行緒的一種手段。每個執行緒池都包含了一些基本資訊,比如內部完成的任務數量。

讓我們先看ThreadPoolExecutor類的一系列代表狀態的

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl作為AtomicInteger類存放了類中的兩種資訊,在其中由高3位來儲存執行緒池的狀態,後29位來儲存此時執行緒池中的Woker類執行緒數量(由此可知,執行緒池中的執行緒數量最高可以接受大約在五億左右)由此可見給出的runStateOf()workerCountOf()方法分別給出了檢視執行緒狀態和執行緒數量的方法。

該類一共給出了五種狀態,讓我們看作者給出的註釋,

*   RUNNING:  Accept new tasks and process queued tasks*   SHUTDOWN: Don't accept new tasks, but process queued tasks

*   STOP:     Don't accept new tasks, don't process queued tasks,*             and interrupt in-progress tasks*   TIDYING:  All tasks have terminated, workerCount is zero,*             the thread transitioning to state TIDYING*             will run the terminated() hook method*   TERMINATED: terminated() has completed

RUNNING狀態可以接受新進來的任務,同時也會執行佇列裡的任務。

SHUTDOWN 狀態已經不會再接受新任務,但仍舊會處理佇列中的任務。

STOP狀態在之前的基礎上,不會處理佇列中的人物,在執行的任務也會直接被打斷。

TIDYING狀態在之前的基礎上,所有任務都已經終止,池中的Worker執行緒都已經為0,也就是stop狀態在清理完所有工作執行緒之後就會進入該狀態,同時在shutdown狀態在佇列空以及工作執行緒清理完畢之後也會直接進入這個階段,這一階段會迴圈執行terminated()方法。

TERMINATED 狀態作為最後的狀態,在之前的基礎上terminated()方法也業已執行完畢,才會從上個狀態進入這個狀態,代表執行緒池已經完全停止。

由於執行緒池的狀態都是通過AtomicInteger來儲存的,可以通過比較的方式簡單的得到當前執行緒狀態。


private final BlockingQueue<Runnable> workQueue;

private final ReentrantLock mainLock = new ReentrantLock();

private final HashSet<Worker> workers = new HashSet<Worker>();

private final Condition termination = mainLock.newCondition();

private int largestPoolSize;

private long completedTaskCount;

private volatile ThreadFactory threadFactory;

private volatile RejectedExecutionHandler handler;

private volatile long keepAliveTime;

private volatile boolean allowCoreThreadTimeOut;

private volatile int corePoolSize;

private volatile int maximumPoolSize;

接下來是執行緒池的幾個有關工作執行緒的變數

corePoolSize表示執行緒池中允許存活最少的工作執行緒數量,但值得注意的是如果allowCoreThreadTimeOut一旦設定true(預設false),每個執行緒的存活時間只有keepAliveTime也就是說在allowCoreThreadTimeOuttrue的時候,該執行緒池最小的工作執行緒數量為0;maximumPoolSize代表執行緒池中最大的工作執行緒數量。

keepAliveTime為執行緒池中工作執行緒數量大於corePoolSize時,每個工作執行緒的在等待工作時最長的等待時間。

workQueue作為執行緒池的任務等待佇列,這個將在接下來的execute()裡詳細解釋。

Workers作為存放執行緒池中存放工作執行緒的容器。

largestPoolSize用來記錄執行緒池中存在過的最大的工作執行緒數量。

completedTaskCount用來記錄執行緒池完成的任務的總數。

Handler作為執行緒池中在不能接受任務的時候的拒絕策略,我們可以實現自己的拒絕策略,在實現了RejectedExecutionHandler介面的前提下。下面是執行緒池的預設拒絕策略,

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

threadFactory作為執行緒池生產執行緒的工廠類,下面是執行緒池預設的執行緒工廠的生產執行緒方法。

public Thread newThread(Runnable r) {
    Thread t = new Thread(group, r,
                          namePrefix + threadNumber.getAndIncrement(),
                          0);
    if (t.isDaemon())
        t.setDaemon(false);
    if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
    return t;
}

我們可以先看我們最常呼叫的execute()方法。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
 
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

execute()內部的呼叫邏輯非常清晰。

如果當前執行緒池的工作執行緒數量小於corePoolSize,那麼直接呼叫addWoker(),來新增工作執行緒。下面是addWorker()的具體方法


private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
                int c = ctl.get();
                int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())                         
		   throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

這段方法比較長,但整體的邏輯還是清晰的。

首先判斷當前執行緒池的狀態如果已經狀態不是shutdown或者running,或者已經為shutdown但是工作佇列已經為空,那麼這個時候直接返回新增工作失敗。接下來是對執行緒池執行緒數量的判斷,根據呼叫時的core的值來判斷是跟corePoolSize還是 maximumPoolSize判斷。

在確認了執行緒池狀態以及執行緒池中工作執行緒數量之後才真正開始新增工作執行緒

新建立一個worker類(執行緒池的內部類,具體的工作執行緒),將要執行的具體執行緒做為構造方法中的引數傳遞進去,接下來將其加入執行緒池的工作執行緒容器workers,並且更新工作執行緒最大量,最後呼叫worker工作執行緒的start()方法就完成了工作執行緒的建立與啟動

讓我們回到execute()方法,如果我們在一開始的執行緒數量就大於corePoolSize或者我們在呼叫addworker()方法的過程中出現了問題導致新增工作執行緒數量失敗,那麼我們會繼續執行接下來的邏輯。

在判斷完畢執行緒池的狀態後則會將任務通過workQueue.offer())方法試圖加進任務佇列。Offer()方法的具體實現會根據線上程池構造方法中選取的任務佇列種類而產生變化

但是如果成功加入了任務佇列仍舊需要注意判斷如果執行緒池的狀態如果已經不是running那麼會拒絕執行這一任務並執行相應的拒絕策略。在最後需要記得成功加入佇列成功後如果執行緒池中如果已經沒有了工作執行緒,需要重新建立一個工作執行緒去執行仍舊在任務佇列中等待執行的任務。

如果在之前的前提下加入任務佇列也失敗了(比如任務佇列已滿),則會在不超過執行緒池最大執行緒數量的前提下建立一個工作執行緒來處理

如果在最後的建立工作執行緒也失敗了那麼我們只有很遺憾的執行任務的拒絕策略了

在之前的過程中我們建立了工作執行緒Worker()類,那麼我們現在看看worker類的內部實現,也可以說是執行緒池的核心部分。Worker類作為執行緒池的內部類,接下來是Worker()類的成員。

final Thread thread;

Runnable firstTask;

volatile long completedTasks;

thread作為worker的工作執行緒空間,由執行緒池中所設定的執行緒工廠生成。

firstTask則是worker在構造方法中所接受到的所要執行的任務。

completedTasks作為該worker類所執行完畢的任務總數。

接下來我們可以看最重要的也就是我們之前建立完Worker類之後立馬呼叫的run()方法了

public void run() {
    runWorker(this);
}

run()方法實現的很簡單我們可以繼續追蹤下去

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); 
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

如果這個worker還沒有執行過在構造方法就傳入的任務那麼在這個方法中會直接執行這一任務,如果沒有,則會嘗試去從任務隊列當中去取的新的任務。

但是在真正呼叫任務之前仍舊會判斷執行緒池的狀態如果已經不是running亦或是shutdwon則會直接確保執行緒被中斷如果沒有將會繼續執行並確保不被中斷

接下來可見我們所需要的任務直接在工作執行緒中直接以run()方式以非執行緒的方式所呼叫,這裡也就是我們所需要的任務真正執行的地方。

在執行完畢後工作執行緒的使命並沒有真正宣告段落while部分worker仍舊會通過getTask()方法試圖取得新的任務下面是getTask()的實現

private Runnable getTask() {
    boolean timedOut = false; 
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

               if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        boolean timed;            
        for (;;) {
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  
            if (runStateOf(c) != rs)
                continue retry;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}


首先仍舊會判斷執行緒池的狀態是否是running還是shutdown以及stop狀態下佇列是否仍舊有需要等待執行的任務。如果狀態沒有問題,則會跟據allowCoreThreadTimeOutcorePoolSize的值通過對前面這兩個屬性解釋的方式來選擇從任務佇列中獲得任務的方式(是否設定timeout)。其中的timedOut保證了確認前一次試圖取任務時超時發生的記錄以確保工作執行緒的回收

runWorker()方法的最後呼叫了processWorkerExist()方法來執行工作執行緒的回收

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; 
        }
        addWorker(null, false);
    }
}

在這一方法中,首先確保已經重新更新了執行緒池中工作執行緒的數量,之後從執行緒池中的工作執行緒容器移去當前工作執行緒,並且將完成的任務總數加到執行緒池的任務總數當中。

在最後仍舊要確保執行緒池中依舊存在大於等於最小執行緒數量的工作執行緒數量存在如果沒有則重新建立工作執行緒去等待處理任務佇列中任務