1. 程式人生 > >執行緒池原理--執行器ThreadPoolExecutor

執行緒池原理--執行器ThreadPoolExecutor

文章目錄


執行緒池原理–總索引

執行緒池原理–執行器ThreadPoolExecutor

ThreadPoolExecutor 是Executor的核心實現類。

屬性

  • 執行緒池執行狀態
    • RUNNING:接受新的任務,並且處理佇列中的任務
    • SHUTDOWN: 不接受新的任務,但是仍然處理佇列中的任務
    • STOP: 不接受新的任務,也不處理佇列中的任務
    • TIDYING: 所有的任務已經結束, workerCount 為0,程式會呼叫鉤子方法
      terminated(),這個什麼也沒做。
    • TERMINATED: 所有的任務都已經完成。
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private
static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;

在這裡插入圖片描述

  • AtomicInteger ctl
    原子整形包裝的執行緒池控制狀態(The main pool control state)。
    在這裡插入圖片描述
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//rs :高3位,runState , wc : 低29位,workerCount,執行緒池中當前活動的執行緒數量
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • CAPACITY 執行緒池的最大容量
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
private final BlockingQueue<Runnable> workQueue;
  • ReentrantLock 用於對於工作集和related bookkeeping(不知道啥意思)的併發訪問鎖控制。
private final ReentrantLock mainLock = new ReentrantLock();
  • 工作執行緒集,Worker是ThreadPoolExecutor類的內部類,並實現了Runnable介面,使用者提交的任務都是給Worker執行緒執行。
private final HashSet<Worker> workers = new HashSet<>();
  • Tracks largest attained pool size. Accessed only under mainLock.
private int largestPoolSize;
private final Condition termination = mainLock.newCondition();
  • 完成任務的計數器,當任務結束時更新。
private long completedTaskCount;
  • 建立新執行緒的工廠
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
  • 預設的拒絕策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
  • 當 allowCoreThreadTimeOut設定為true.
    如果當前執行緒池中的執行緒如果大於corePoolSize,那麼如果空閒時間超過keepAliveTime,那麼就會銷燬掉一些執行緒。
    否則,就一直等到有新的任務執行。
 private volatile long keepAliveTime;
  • true:核心執行緒即使空閒也不會被銷燬掉。
  • false:核心執行緒空閒超過keepAliveTime定義的超時時間,則會被銷燬掉。
private volatile boolean allowCoreThreadTimeOut;
  • 執行緒池核心執行緒大小
private volatile int corePoolSize;
  • 最大的執行緒池大小 = 核心執行緒大小 + 非核心執行緒大小
private volatile int maximumPoolSize;
  • 執行時許可權

 private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");
  • 呼叫finalize會用到
 private final AccessControlContext acc;
  this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();

構造器

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
  public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

構造器引數介紹

下面來解釋下各個引數:

  • int corePoolSize:該執行緒池中核心執行緒數最大值
    核心執行緒:執行緒池新建執行緒的時候,如果當前執行緒總數小於corePoolSize,則新建的是核心執行緒,如果超過corePoolSize,則新建的是非核心執行緒,核心執行緒預設情況下會一直存活線上程池中,即使這個核心執行緒啥也不幹(閒置狀態)。
    如果指定ThreadPoolExecutor的allowCoreThreadTimeOut這個屬性為true,那麼核心執行緒如果不幹活(閒置狀態)的話,超過一定時間(時長下面引數決定),就會被銷燬掉。

  • int maximumPoolSize: 該執行緒池中執行緒總數最大值
    執行緒總數 = 核心執行緒數 + 非核心執行緒數。

  • long keepAliveTime
    當 allowCoreThreadTimeOut設定為true.
    如果當前執行緒池中的執行緒如果大於corePoolSize,那麼如果空閒時間超過keepAliveTime,那麼就會銷燬掉一些執行緒。
    否則,就一直等到有新的任務執行。

  • TimeUnit unit:keepAliveTime的單位
    TimeUnit是一個列舉型別,其包括:
    NANOSECONDS : 1微毫秒 = 1微秒 / 1000
    MICROSECONDS : 1微秒 = 1毫秒 / 1000
    MILLISECONDS : 1毫秒 = 1秒 /1000
    SECONDS : 秒
    MINUTES : 分
    HOURS : 小時
    DAYS : 天

  • BlockingQueue ArrayBlockingQueue :一個由陣列結構組成的有界阻塞佇列。

    • LinkedBlockingQueue :一個由連結串列結構組成的有界阻塞佇列。
    • PriorityBlockingQueue :一個支援優先順序排序的無界阻塞佇列。
    • DelayQueue: 一個使用優先順序佇列實現的無界阻塞佇列。
    • SynchronousQueue: 一個不儲存元素的阻塞佇列。
    • LinkedTransferQueue: 一個由連結串列結構組成的無界阻塞佇列。
    • LinkedBlockingDeque: 一個由連結串列結構組成的雙向阻塞佇列。
  • ThreadFactory threadFactory:執行緒工廠,用於建立執行緒執行 我們提交的任務。

  • RejectedExecutionHandler handler:這個指定當佇列滿時繼續新增任務該u做如何處理,詳細可以看執行緒池原理–拒絕策略之RejectedExecutionHandler類

execute()方法

execute()方法用於提交任務。

//提交的Runnable介面的實現類
//實際使用者可以提交Runnable介面的實現類或者Callable介面的實現類,AbstractExecutorService會在submit()方法中進行預處理,將Callable型別物件轉化為Runnable型別物件
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();         
         */
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

在這裡插入圖片描述

  1. 如果當前執行的執行緒少於corePoolSize,則建立新執行緒來執行任務(需要獲得全域性鎖)
  2. 如果執行的執行緒等於或多於corePoolSize ,則將任務加入BlockingQueue
  3. 如果無法將任務加入BlockingQueue(佇列已滿),則建立新的執行緒來處理任務(需要獲得全域性鎖)
  4. 如果建立新執行緒將使當前執行的執行緒超出maxiumPoolSize,任務將被拒絕,並呼叫RejectedExecutionHandler.rejectedExecution()方法。
    在這裡插入圖片描述
  • addWorker(Runnable firstTask, boolean core)

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

引數:
firstTask: worker執行緒的初始任務,可以為空
core: true:將corePoolSize作為上限,false:將maximumPoolSize作為上限
addWorker方法有4種傳參的方式:

1、addWorker(command, true)

2、addWorker(command, false)

3、addWorker(null, false)

4、addWorker(null, true)

在execute方法中就使用了前3種,結合這個核心方法進行以下分析
第一個:執行緒數小於corePoolSize時,放一個需要處理的task進Workers Set。如果Workers Set長度超過corePoolSize,就返回false
第二個:當佇列被放滿時,就嘗試將這個新來的task直接放入Workers Set,而此時Workers Set的長度限制是maximumPoolSize。如果執行緒池也滿了的話就返回false
第三個:放入一個空的task進workers Set,長度限制是maximumPoolSize。這樣一個task為空的worker線上程執行的時候會去任務佇列裡拿任務,這樣就相當於建立了一個新的執行緒,只是沒有馬上分配任務
第四個:這個方法就是放一個null的task進Workers Set,而且是在小於corePoolSize時,如果此時Set中的數量已經達到corePoolSize那就返回false,什麼也不幹。實際使用中是在prestartAllCoreThreads()方法,這個方法用來為執行緒池預先啟動corePoolSize個worker等待從workQueue中獲取任務執行