1. 程式人生 > >Java執行緒(八):ThreadPoolExecutor、RejectedExecutionHandler

Java執行緒(八):ThreadPoolExecutor、RejectedExecutionHandler

ThreadPoolExecutor

public class ThreadPoolExecutor extends AbstractExecutorService

  1. 執行緒池可以解決兩個不同問題:由於減少了每個任務呼叫的開銷,它們通常可以在執行大量非同步任務時提供增強的效能,並且還可以提供繫結和管理資源(包括執行任務集時使用的執行緒)的方法。
  2. 每個 ThreadPoolExecutor 還維護著一些基本的統計資料,如完成的任務數。
  3. 一個 ExecutorService,它使用可能的幾個池執行緒之一執行每個提交的任務,通常使用 Executors 工廠方法配置。

構造方法摘要

用給定的初始引數建立新的 ThreadPoolExecutor。

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 

構造方法引數解釋:

corePoolSize

池中所儲存的執行緒數,包括空閒執行緒。

maximumPoolSize

池中允許的最大執行緒數。

keepAliveTime

當執行緒數大於核心時,此為終止前多餘的空閒執行緒等待新任務的最長時間。

unit

keepAliveTime 引數的時間單位。

workQueue

執行前用於保持任務的佇列。此佇列僅保持由 execute 方法提交的 Runnable 任務。

threadFactory

執行程式建立新執行緒時使用的工廠。
預設DefaultThreadFactory,建立普通的優先順序為5且非守護的執行緒。

handler

由於超出執行緒範圍和佇列容量而使執行被阻塞時所使用的處理程式。
預設AbortPolicy,處理程式遭到拒絕將丟擲執行時 RejectedExecutionException。

常見方法

void execute(Runnable command)

在將來某個時間執行給定任務。可以在新執行緒中或者在現有池執行緒中執行該任務。 如果無法將任務提交執行,或者因為此執行程式已關閉,或者因為已達到其容量,則該任務由當前 RejectedExecutionHandler 處理。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }

其中通過Thread t = threadFactory.newThread(w)將command例項化成執行緒。 workers.add( new Worker(command))將command放入到HashSet workers儲存的工作執行緒集合中,command執行完畢後 workers.remove(w);

void shutdown()

按過去執行已提交任務的順序發起一個有序的關閉,但是不接受新任務。如果已經關閉,則呼叫沒有其他作用。

public void shutdown() {
    SecurityManager security = System.getSecurityManager();
    if (security != null)
            security.checkPermission(shutdownPerm);

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (security != null) { // Check if caller can modify our threads
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            }

            int state = runState;
            if (state < SHUTDOWN)
                runState = SHUTDOWN;

            try {
                for (Worker w : workers) {
                    w.interruptIfIdle();
                }
            } catch (SecurityException se) { // Try to back out
                runState = state;
                // tryTerminate() here would be a no-op
                throw se;
            }

            tryTerminate(); // Terminate now if pool and queue empty
        } finally {
            mainLock.unlock();
        }
}
shutdownNow

嘗試停止所有的活動執行任務、暫停等待任務的處理,並返回等待執行的任務列表。在從此方法返回的任務佇列中排空(移除)這些任務。
並不保證能夠停止正在處理的活動執行任務,但是會盡力嘗試。

public List<Runnable> shutdownNow() {
    SecurityManager security = System.getSecurityManager();
    if (security != null)
            security.checkPermission(shutdownPerm);

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (security != null) { // Check if caller can modify our threads
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            }

            int state = runState;
            if (state < STOP)
                runState = STOP;

            try {
                for (Worker w : workers) {
                    w.interruptNow();
                }
            } catch (SecurityException se) { // Try to back out
                runState = state;
                // tryTerminate() here would be a no-op
                throw se;
            }

            List<Runnable> tasks = drainQueue();
            tryTerminate(); // Terminate now if pool and queue empty
            return tasks;
        } finally {
            mainLock.unlock();
        }
    }

shutdown及shutdownNow關閉任務的實現均是通過 Thread.interrupt() 取消任務,所以無法響應中斷的任何任務可能永遠無法終止。

poolSize與建構函式中幾個引數的關係

poolSize:當前執行的執行緒。

  1. 新任務提交時,若poolSize < corePoolSize,建立新執行緒來處理請求,即使其他輔助執行緒是空閒的。
  2. 若poolSize > corePoolSize,且poolSize < maximumPoolSize,workQueue未滿,放入workQueue中,等待執行緒池中任務排程執行。
  3. 第二種情況下,若workQueue已滿,新提交任務由RejectedExecutionHandler處理 。
  4. 當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理 。
  5. 當執行緒池中超過corePoolSize執行緒,空閒時間達到keepAliveTime時,關閉空閒執行緒 。
  6. 當設定allowCoreThreadTimeOut(true)時,執行緒池中corePoolSize執行緒空閒時間達到keepAliveTime也將關閉。

排隊

所有 BlockingQueue 都可用於傳輸和保持提交的任務。可以使用此佇列與池大小進行互動:

  1. 如果執行的執行緒少於 corePoolSize,則 Executor 始終首選新增新的執行緒,而不進行排隊。
  2. 如果執行的執行緒等於或多於 corePoolSize,但又小於maximumPoolSize,則 Executor 始終首選將請求加入佇列,而不新增新的執行緒。
  3. 如果無法將請求加入佇列,則建立新的執行緒,除非建立此執行緒超出 maximumPoolSize,在這種情況下,由RejectedExecutionHandler處理。

排隊有三種通用策略:

  1. 直接提交。工作佇列的預設選項是 SynchronousQueue,它將任務直接提交給執行緒而不保持它們。
  2. 無界佇列。使用無界佇列將導致在所有 corePoolSize 執行緒都忙時新任務在佇列中等待。
  3. 有界佇列。當使用有限的 maximumPoolSizes 時,有界佇列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。

當 Executor 已經關閉,並且 Executor 將有限邊界用於最大執行緒和工作佇列容量,且已經飽和時,在方法 execute(java.lang.Runnable) 中提交的新任務將被 拒絕,由RejectedExecutionHandler處理。

從原始碼看出,AbstractExecutorService提供submit、newTaskFor、invokeAny的預設實現,而ThreadPoolExecutor主要關注提交任務的處理,execute及shutdown。
通俗地理解:提交任務由AbstractExecutorService負責,處理提交請求及提交後執行緒任務的執行由ThreadPoolExecutor負責。
利用ReentrantLock保證執行緒同步安全, mainLock.newCondition()實現執行緒阻塞與喚醒。

其實這其中經常有大家忽視的兩點:
(1)執行緒池初始化時,alive執行緒數多少?
這裡就要看你如何使用執行緒池,預設執行緒池中沒有任何執行緒,在execute or submit時才建立執行緒執行任務。
ThreadPoolExecutor還有一種顯式建立:prestartCoreThread()建立一個核心執行緒 or 建立所有核心執行緒數prestartAllCoreThreads()。

    //顯式建立單個核心執行緒
    public boolean prestartCoreThread() {
        return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
    }
    //顯式建立所有核心執行緒
    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }

其實在內嵌的tomcat-embed-core核心包中,已經顯式建立所有核心執行緒:
這裡寫圖片描述
(2)執行緒池執行完任務後,又是如何保證核心執行緒處於alive狀態的?
執行緒池每次新增成功任務時,都會初始化一個Worker。

  //ThreadPoolExecutorb內部工作任務,對excute or submit 的任務進行了包裝
   Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);//本行作用相當於new Thread(firstTask)
   }     

這裡寫圖片描述
t.start()實際執行的是Worker類中run()->runWorker()->getTask()

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // 判斷workers是否需要剔除,即是否需要保證core thread alive
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                    //上述程式碼就保證了core thread是帶有超時時間,還是一直阻塞等待任務
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

RejectedExecutionHandler

public interface RejectedExecutionHandler

  1. 無法由 ThreadPoolExecutor 執行的任務的處理程式。
public interface RejectedExecutionHandler {

    /**
     * 當 execute 不能接受某個任務時,可以由 ThreadPoolExecutor 呼叫的方法。
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

ThreadPoolExecutor定義了四種:

  1. ThreadPoolExecutor.AbortPolicy:拒絕並丟擲RejectedExecutionException。
  2. ThreadPoolExecutor.CallerRunsPolicy:拒絕但在呼叫者的執行緒中直接執行該任務。
  3. ThreadPoolExecutor.DiscardPolicy :拒絕但不做任何動作。
  4. ThreadPoolExecutor.DiscardOldestPolicy:如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程)。