1. 程式人生 > >java8之ThreadPoolExecutor原始碼解讀

java8之ThreadPoolExecutor原始碼解讀

前言

        現在多數伺服器都有多個CPU,這為多執行緒併發執行任務提供了良好的硬體支援,在開發中,我們也會根據具體業務嘗試使用多執行緒程式設計來提高業務執行的效率。那麼在java8中如何正確的使用多執行緒?如何減少資源的消耗讓任務執行更高效呢?

 

執行緒池

        在java中建立、銷燬多執行緒都是要破費很多資源的,所以在使用中應該避免自己建立多執行緒去執行任務,而是使用執行緒池來處理。我們常用的就是ExecutorService,它是執行多執行緒的服務,而ThreadPoolExecutor就是它的一個實現類。在Executors類中,提供了建立執行緒池的各種靜態方法。

我們先來看看Executors類中都有哪些方法,以下是此類的結構,可以看到此類不能建立例項只提供靜態方法。

 

其中最常使用的有:

  • newFixedThreadPool(int tnum)建立一個固定執行緒數量的執行緒池,例如:tnum=3表示最多有三個執行緒同時工作。
  • newSingleThreadExecutor 建立單執行緒的執行緒池,也就是說它只會有一個執行緒來工作,從而可以保證所有任務按照指定優先順序順序執行。
  • newCachedThreadPool建立一個可快取的無界(不超過int的max_value)執行緒池,如果執行緒池長度超過處理任務,可以回收空閒執行緒,否則新建執行緒來處理此次任務。
  • newScheduledThreadPool延時處理執行緒池,支援定時執行或定時週期執行任務。

上面的四種執行緒池除newScheduledThreadPool是由ScheduledThreadPoolExecutor類實現的功能,其它的都是由ThreadPoolExecutor實現或間接由ThreadPoolExecutor實現。下面我們來看看ThreadPoolExecutor有何神奇之處。

 

ThreadPoolExecutor實現原理

建構函式與引數

       首先我們來看一下ThreadPoolExecutor類的建構函式

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

其它的建構函式都是由此生成,只是傳入了一些預設值,其中各引數的意義:

  1. corePoolSize,核心執行緒數,表示執行緒池中最大可以保留多少個執行緒來處理任務,如果任務數量小於此值,則會新建立一個工作執行緒來處理此任務。
  2. maxinumPoolSize,最大執行緒數(注意沒有"核心"二字),表示執行緒池中最多可以建立多少個執行緒。當需要處理的任務很多時,maxinumPoolSize減去corePoolSize>0的執行緒可以看作是“遊離執行緒”,在處理完任務後可以被回收掉。
  3. keepAliveTime,這個不太好理解,它是任務的最大空閒時間,超過這個時間沒有取到任務就會銷燬此工作執行緒。
  4. unit,這個就是時間的單位
  5. workQueue,工作執行緒阻塞佇列,當任務大於核心執行緒數後,會將任務處理加入此佇列排隊等待處理。
  6. threadFactory,建立工作執行緒的工廠類
  7. handler,當任務數量超過了maxinumPoolSize就是拒絕此任務,使用hanlder處理策略。實現RejectedExecutionHandler此介面的類(jdk中此介面的實現類都是TheadPoolExecutor內部類),共有四種策略:
    1. AbortPolicy,直接拒絕處理,並丟擲RejectedExecutionException異常。
    2. DiscardPolicy,直接忽略(丟棄)。
    3. CallerRunsPolicy,直接啟動執行傳入的任務執行緒。
    4. DiscardOldestPolicy,丟棄最老的那個任務,來執行此任務。

 

原始碼解讀

我們一般的使用過程是:

1.先建立一個執行緒池

private static ExecutorService executorService = Executors.newFixedThreadPool(3);

2.然後使用此執行緒池來執行任務執行緒

executorService.execute(new Runnable() {
    public void run() {
        ///需要執行的業務
    }
});

 

我們進入execute方法,看一看它是如何執行的

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
     ///下面這段英文,說明了執行的步驟
/*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
//首先判斷工作執行緒數是否小於核心執行緒數,如果是則新建立一個worker來執行此任務。
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
//否則,如果執行緒池沒有shutdown就把任務加入到阻塞佇列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
//如果此時再次判斷,已經shutdown了就從佇列中刪除並拒絕執行任務(拒絕策略上面已經提到)
        if (! isRunning(recheck) && remove(command))
            reject(command);
//如果此時工作執行緒為0,就加入並啟動一個空的工作執行緒(為什麼是一個空的工作執行緒呢?
//因為上面已經把任務已經加入到隊列當中了,只需要啟動一個工作執行緒去執行佇列中的任務即可)
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
//如果加入佇列不成功(這個有可能是傳入的阻塞佇列有大小限制,
//newFixedThreadPool直接使用的是new LinkedBlockingQueue<Runnable>()。)
//就嘗試新新增工作執行緒來執行此次任務。
    else if (!addWorker(command, false))
        reject(command);
}

可以看到,上面的原始碼列出了執行的大概流程,其中有些是方法的封裝,例如reject(command),這個就是執行拒絕策略的,而另一個就是addWorker,新增一個工作執行緒,我們來看看它幹了哪些工作

/**
如果英文好的同學可以直接看這裡,哈哈
 * Checks if a new worker can be added with respect to current
 * pool state and the given bound (either core or maximum). If so,
 * the worker count is adjusted accordingly, and, if possible, a
 * new worker is created and started, running firstTask as its
 * first task. This method returns false if the pool is stopped or
 * eligible to shut down. It also returns false if the thread
 * factory fails to create a thread when asked.  If the thread
 * creation fails, either due to the thread factory returning
 * null, or due to an exception (typically OutOfMemoryError in
 * Thread.start()), we roll back cleanly.
 *
 * @param firstTask the task the new thread should run first (or
 * null if none). Workers are created with an initial first task
 * (in method execute()) to bypass queuing when there are fewer
 * than corePoolSize threads (in which case we always start one),
 * or when the queue is full (in which case we must bypass queue).
 * Initially idle threads are usually created via
 * prestartCoreThread or to replace other dying workers.
 *
 * @param core if true use corePoolSize as bound, else
 * maximumPoolSize. (A boolean indicator is used here rather than a
 * value to ensure reads of fresh values after checking other pool
 * state).
 * @return true if successful
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry://一個for迴圈標記
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 //如果執行狀態正常
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
 //如果工作執行緒數量大於核心執行緒數量或最大執行緒數量
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
 //迴圈原子新增工作執行緒數量
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
 //把任務封裝成一個worker(也是一個執行緒)
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
 //把工作執行緒worker新增到workers的Set中
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s; //重置最大工作執行緒數
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start(); //如果新增到Set成功就啟動此worker
                workerStarted = true; //標記啟動成功
            }
        }
    } finally {
        if (! workerStarted) //如果沒有啟動成功,標記此worker為失敗
            addWorkerFailed(w);
    }
    return workerStarted;
}

可以看到addWorker執行的重要部分就是,把使用者傳入的任務封裝成一個worker並且把這個worker新增到一個Set當中並啟動此worker。我們來看一下worker類的實現

private final class Worker
    extends AbstractQueuedSynchronizer//同步的佇列
    implements Runnable//也是一個執行緒
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
//重點,當上面的執行t.start時就會執行此run
/** Delegates main run loop to outer runWorker  */
public void run() {
    runWorker(this);
}
.........
}

 

/**
 * Main worker run loop.  Repeatedly gets tasks from queue and
 * executes them, while coping with a number of issues:
 *
 * 1. We may start out with an initial task, in which case we
 * don't need to get the first one. Otherwise, as long as pool is
 * running, we get tasks from getTask. If it returns null then the
 * worker exits due to changed pool state or configuration
 * parameters.  Other exits result from exception throws in
 * external code, in which case completedAbruptly holds, which
 * usually leads processWorkerExit to replace this thread.
 *
 * 2. Before running any task, the lock is acquired to prevent
 * other pool interrupts while the task is executing, and then we
 * ensure that unless pool is stopping, this thread does not have
 * its interrupt set.
 *
 * 3. Each task run is preceded by a call to beforeExecute, which
 * might throw an exception, in which case we cause thread to die
 * (breaking loop with completedAbruptly true) without processing
 * the task.
 *
 * 4. Assuming beforeExecute completes normally, we run the task,
 * gathering any of its thrown exceptions to send to afterExecute.
 * We separately handle RuntimeException, Error (both of which the
 * specs guarantee that we trap) and arbitrary Throwables.
 * Because we cannot rethrow Throwables within Runnable.run, we
 * wrap them within Errors on the way out (to the thread's
 * UncaughtExceptionHandler).  Any thrown exception also
 * conservatively causes thread to die.
 *
 * 5. After task.run completes, we call afterExecute, which may
 * also throw an exception, which will also cause thread to
 * die. According to JLS Sec 14.20, this exception is the one that
 * will be in effect even if task.run throws.
 *
 * The net effect of the exception mechanics is that afterExecute
 * and the thread's UncaughtExceptionHandler have as accurate
 * information as we can provide about any problems encountered by
 * user code.
 *
 * @param w the worker
 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
//首先獲取任務,為空則跳出迴圈
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);//執行任務前可以執行其它的業務,
                                        //可以看到如果此方法有異常丟擲則不會執行此任務的內容,使用時需要注意。
                Throwable thrown = null;
                try {
                    task.run();//使用者傳入的任務雖然是一個Runnable但這裡只會呼叫它的run方法
                               //並不會啟動使用者傳入的執行緒,轉而使用worker的執行緒來處理
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
//執行任務完成後需要執行的其它業務,可以使用if(thrown==null)來判斷是否有異常出現,
//可以補救此次任務。
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
//worker的任務重置為null,此工作執行緒執行的任務總數加1
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
//是否需要回收此工作執行緒
        processWorkerExit(w, completedAbruptly);
    }
}

下面我們來看看getTask是如何實現的

 

/**
 * Performs blocking or timed wait for a task, depending on
 * current configuration settings, or returns null if this worker
 * must exit because of any of:
 * 1. There are more than maximumPoolSize workers (due to
 *    a call to setMaximumPoolSize).
 * 2. The pool is stopped.
 * 3. The pool is shutdown and the queue is empty.
 * 4. This worker timed out waiting for a task, and timed-out
 *    workers are subject to termination (that is,
 *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
 *    both before and after the timed wait, and if the queue is
 *    non-empty, this worker is not the last thread in the pool.
 *
 * @return task, or null if the worker must exit, in which case
 *         workerCount is decremented
 */
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
/**重點,判斷是否允許核心執行緒超時或者工作執行緒數多於核心執行緒數。
如果是則使用超時獲取任務(超時後即返回null上面的runWorker的迴圈就會跳出,從而回收此worker),
否則使用阻塞獲取(直到有任務加入,這也是為什麼在execute方法中直接加入到佇列的原因。
此時並不會跳出迴圈,此worker也就是存活的可以處理其它任務,從而達到不用重複建立、銷燬執行緒的目的)
*/
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

這裡只列出主要的方法,其它的方法都是一些輔助功能,如果需要了解請自行檢視原始碼。我們以newFixedThreadPool為例看一下它是如何使用我們需要的功能的

 

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

對比引數說明,可以看到所謂的固定數量的執行緒數量就是核心執行緒數量與最大執行緒數量相等,並且不使用超時。

 

總結

        看完上面所述是否覺得眼前一亮,又重新對執行緒池有了進一步的瞭解呢。這就是分析原始碼的益處,我們可以窺探作者思考路徑,解決問題的思路。如果想達到執行緒的重用那就把任務重新包裝成一個執行緒只要保持封裝的這個執行緒正常執行,就可以繼續處理任務,可以達到重用的目的。這個也為我們開始相似的業務提供一種實現思路。