1. 程式人生 > >ThreadPoolExecutor執行緒池程式碼理解

ThreadPoolExecutor執行緒池程式碼理解

主要的併發訪問變數: 
1. ctl(3位:狀態位 + 29位:workerCount)
2. workers工作執行緒的集合以及一些統計資料,使用mainLock控制保護
3. Worker的中斷標誌,使用AbstractQueuedSynchronizer中的state形成lock和unlock保護

管理執行緒池執行緒: 
execute addWorker addWorkerFailed tryTerminate interruptIdleWorkers interruptWorkers shutdown shutdownNow interruptIfStarted
執行緒池中的工作執行緒:
runWorker processWorkerExit getTask


執行緒池狀態(狀態只能逐漸變大),只有管理執行緒會修改狀態:
RUNNING:可以接受新的任務,也可以處理阻塞佇列裡的任務
SHUTDOWN:不接受新的任務,但是可以處理阻塞佇列裡的任務
STOP:不接受新的任務,不處理阻塞佇列裡的任務,中斷正在處理的任務
TIDYING:過渡狀態,也就是說所有的任務都執行完了,當前執行緒池已經沒有有效的執行緒,這個時候執行緒池的狀態將會TIDYING,並且將要呼叫terminated方法
TERMINATED:終止狀態。terminated方法呼叫完成以後的狀態
狀態變化遷移:
RUNNING -> SHUTDOWN:手動呼叫shutdown方法,或者ThreadPoolExecutor要被GC回收的時候呼叫finalize方法,finalize方法內部也會呼叫shutdown方法
(RUNNING or SHUTDOWN) -> STOP:呼叫shutdownNow方法,可以在之前呼叫過shutdown
SHUTDOWN -> TIDYING:當佇列和執行緒池都為空的時候
STOP -> TIDYING:當執行緒池為空的時候
TIDYING -> TERMINATED:terminated方法呼叫完成之後
ThreadPoolExecutor內部還儲存著執行緒池的有效執行緒個數。

執行緒池數量:
一般來說管理執行緒會增加workerCount
工作執行緒退出時減少workerCount
但是在管理執行緒addWorker中沒有成功啟動一個執行緒是會回退workerCount

關閉方法總結:
shutdown方法會更新狀態到SHUTDOWN,不會影響阻塞佇列裡任務的執行,但是不會執行新進來的任務。同時也會回收閒置的Worker,閒置Worker(沒有獲得worker中lock)。
shutdownNow方法會更新狀態到STOP,會影響阻塞佇列的任務執行,也不會執行新進來的任務。同時會回收所有的Worker。

/**
* 繼承AbstractQueuedSynchronizer為了實現對中斷標誌的管理以及防止在runWorker前就被interrupt
* 在每次執行task前會lock,在中斷前會tryLock
*/
Worker執行緒
1. runWorker
 * 1. unlock 設定worker.status = 0,鎖初始化狀態(未鎖)
 * 2.for (;;)
 *   1. getTask獲得任務 或者 初始化task不為null
 *   2. worker.lock其他執行緒不允許設定worker的中斷標誌
 *   3. 1. 如果執行緒池已經處於>=STOP狀態並且當前執行緒沒有被中斷,中斷執行緒
        2. 如果執行緒池還處於RUNNING或SHUTDOWN狀態,並且當前執行緒已經被中斷了,重新檢查一下執行緒池狀態,(跟shutdownNow有競爭)
        3. 如果處於STOP狀態並且沒有被中斷
        那麼設定中斷執行緒worker.interrupt,表示可以退出
 *   4. task.run執行任務 (如果task執行發現異常,completedAbruptly=true)
     5. finally worker.unlock
 * 3. processWorkerExit

2. processWorkerExit
 * 1. 異常退出,補償一次 decrementWorkerCount 修改 workerCount
 * 2. mainLock.lock
 * 3. 修改workers刪除worker
 * 4. mainLock.unlock
 * 5. tryTerminate
 * 6. 1. 使用者執行的任務發生了異常
      2. Worker數量比執行緒池基本大小要小
      3. 阻塞佇列不空但是沒有任何Worker在工作
      addWorker(null, false); 新增一個執行緒

3. getTask
 * 1. 大於等於STOP 或 為SHUTDOWN && 佇列已經為空,decrementWorkerCount,返回null
 * 2. 超時 && 執行緒數超過(corePoolSize || 設定了allowCoreThreadTimeOut)  && (wc > 1 || 佇列為空)
  	decrementWorkerCount,返回null,用於回收多餘的worker
 * 3. take或者poll從佇列中獲取task


管理執行緒
1. execute 
 * 1.小於corePoolSize直接addWorker
 * 2.>= corePoolSize&&正在執行中, 放入workQueue中排隊處理
 *   offer後再次檢查,因為在之前ctl.get後到offer完成前有時間視窗,比如:offer完成前執行緒池狀態從RUNNING->TERMINATED這時,放到佇列
     中的任務永遠被遺忘了
     1.如果為非running狀態,remove 
     2.檢查是否有worker,因為command已經在queue中了需要有worker處理
 * 3.執行緒池狀態不是RUNNING或者workercount>=corePoolSize,
 *   addWorker(command, false),在addWorker函式會過濾執行緒池狀態,考慮正在shutdown的情況

2. addWorker 
 * 1. 1.執行緒池不在RUNNING狀態並且狀態是STOP、TIDYING或TERMINATED中的任意一種狀態
 *    2.執行緒池不在RUNNING狀態,執行緒池接受了新的任務firstTask!=null
 *    3. 執行緒池不在RUNNING狀態,阻塞佇列為空
 	  這些情況不增加worker
 * 2. compareAndIncrementWorkerCount 更新 workerCount
 * 3. 建立worker
 * 4. mainLock.lock後更新workers集合
 * 5. 啟動新建立的worker---------------------worker正式開始工作但是狀態還是-1,跑到runWorker後unlock才會變成0
 * 6. 如果啟動是否addWorkerFailed

3. addWorkerFailed 建立worker時失敗(worker沒有正常啟動)
 * 1. mainLock.lock
 * 2. decrementWorkerCount
 * 3. tryTerminate

4. tryTerminate(在worker減少或者removing tasks時被呼叫)
 * 1. 如果執行緒狀態為 Running、TERMINATED,TIDYING(已經在結束的路上了)、SHUTDOWN但是workQueue不為空不允許退出
 * 2. 到這裡說明狀態為shutdown&&佇列為空
      如果workerCount != 0, interruptIdleWorkers 中斷一個空閒的worker,
      在worker回收最後processWorkerExit還會呼叫tryTerminate
 * 3. compareAndSet 設定 status-TIDYING
 * 4. terminated
 * 5. 設定 compareAndSet status-TERMINATED最終結束

5. interruptIdleWorkers
 * 1. mainLock.lock
 * 2. 遍歷workers,如果執行緒沒有中斷 && worker.tryLock(如果正確執行task不會成功)成功
      設定worker的中斷標誌
 * 3. 如果只打斷一個,成功一個後退出

6. interruptWorkers
 * 1. mainLock.lock
 * 2. 遍歷所有workers,interruptIfStarted(剛初始化沒有執行到runWorker的會被過濾)

7. shutdown
 * 1. advanceRunState SHUTDOWN: 原來狀態大於等於targetState 或者 compareAndSet 設定 status 為 SHUTDOWN or STOP
 * 2. interruptIdleWorkers(false)設定所有worker的中斷標誌
 * 3. tryTerminate

8. shutdownNow
 * 1. advanceRunState STOP: 原來狀態大於等於targetState 或者 compareAndSet 設定 status 為 SHUTDOWN or STOP
 * 2. interruptWorkers
 * 3. drainQueue將沒有跑完的任務返回

9. interruptIfStarted(worker中方法)
 * 1. 如果狀態不是為-1(還沒有進入runWorker,只是初始化狀態) && 沒有中斷標誌
  	  設定中斷標誌
  假設管理執行緒呼叫shutdownNow-STOP,但是某個worker在呼叫interruptIfStarted時還是初始化狀態,
  隨後worker會進入runWorker函式發現執行緒池狀態以及是STOP狀態,所以會直接退出
主要函式註釋:
public class ThreadPoolExecutor  extends AbstractExecutorService  {

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;  //29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; //1fffffff

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;     //-536870912 e0000000, 初始化狀態
    private static final int SHUTDOWN   =  0 << COUNT_BITS;     //0 0
    private static final int STOP       =  1 << COUNT_BITS;     //536870912 20000000
    private static final int TIDYING    =  2 << COUNT_BITS;     //1073741824 40000000
    private static final int TERMINATED =  3 << COUNT_BITS;     //1610612736 60000000

    // 獲得state
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 獲得count
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 使用state和count拼接處ctl值
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * Attempts to CAS-increment the workerCount field of ctl.
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /**
     * Attempts to CAS-decrement the workerCount field of ctl.
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    /**
     * Decrements the workerCount field of ctl. This is called only on
     * abrupt termination of a thread (see processWorkerExit). Other
     * decrements are performed within getTask.
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

    private BlockingDeque<Runnable> workQueue;

    private volatile RejectedExecutionHandler handler;

    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

    private final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
    /**
     * 用於workers集合以及相關統計資訊併發控制
     * shutdown shutdownNow interruptIdleWorkers interruptWorkers
     * processWorkerExit addWorker termination 以及 getXXX都會用到
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    private final HashSet<Worker> workers = new HashSet<>();

    /**
     * tryTerminate-signal awaitTermination-await中使用
     */
    private final Condition termination = mainLock.newCondition();

    /**
     * 在mainLock下使用
     */
    private long completedTaskCount;

    private volatile int corePoolSize;
    private volatile int maximumPoolSize;

    // 到目前為止出現過最大的數量
    private int largestPoolSize;
    private volatile ThreadFactory threadFactory;
    //一般使用在idle執行緒,如果allowCoreThreadTimeOut設定為true也適用
    private volatile long keepAliveTime;
    //預設為false,如果為true表示core threads使用keepAliveTime超時時間
    private volatile boolean allowCoreThreadTimeOut;

    protected void beforeExecute(Thread t, Runnable r) { }
    protected void afterExecute(Runnable r, Throwable t) { }

    public BlockingQueue<Runnable> getQueue() {
        return workQueue;
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }

                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }

    }

    protected void terminated() { }

    /*
     *This method must be called following any action that might make
     * termination possible -- reducing worker count or removing tasks
     * from the queue during shutdown.
     */
    private void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /**
             * 1.running
             * 2.TERMINATED,TIDYING
             * 3.SHUTDOWN但是workQueue不為空
             */
            if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) {
                return;
            }
            // 到這裡說明不為running狀態,如果shutdown狀態佇列也為空了,可以回收workers
            if (workerCountOf(c) != 0) { // Eligible to terminate
                // 中斷閒置一個Worker,直到回收全部的Worker。
                // 這裡沒有那麼暴力,只中斷一個,中斷之後退出方法,中斷了Worker之後,Worker會回收,
                // 在processWorkerExit中然後還是會呼叫tryTerminate方法,如果還有閒置執行緒,那麼繼續中斷
                interruptIdleWorkers(true);
                return;
            }

            // 這裡到這裡說明不為running狀態,如果shutdown狀態佇列也為空了,工作執行緒為0
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // cas操作,將執行緒池狀態改成TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))){
                    try {
                        // 從TIDYING ->TERMINATED給一次回撥的機會
                        terminated();
                    } finally {
                        // terminated方法呼叫完畢之後,狀態變為TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 通知awaitTerminate的執行緒
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock(); // 解鎖
            }
        }
    }

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 異常退出不會再getTask中decrementWorkerCount
        if (completedAbruptly) {
            decrementWorkerCount();
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 嘗試結束執行緒池,滿足條件會呼叫interruptIdleWorkers
        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c,STOP)) {
            if (!completedAbruptly) {
                //正常結束情況
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && !workQueue.isEmpty())
                    min = 1;

                // 不需要開執行緒
                if (workerCountOf(c) >= min)
                    return;
            }

            // 新開一個Worker代替原先的Worker
            // 新開一個Worker需要滿足以下3個條件中的任意一個:
            // 1. 使用者執行的任務發生了異常
            // 2. Worker數量比執行緒池基本大小要小
            // 3. 阻塞佇列不空但是沒有任何Worker在工作
            addWorker(null, false);
        }
    }

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();

        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

    /**
     *
     * @param firstTask 如果為null,表示這個worker起來是為了從佇列中獲得task
     *                  否則表示問因為有新任務execute引起worker的增加
     * @param core 是否檢查當前worker >= corePoolSize
     * @return true:成功新增並且worker已經started
     *          false:失敗
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (; ; ) {  // #1
            int c = ctl.get();
            int rs = runStateOf(c);

            /**
             * 1. 執行緒池不在RUNNING狀態並且狀態是STOP、TIDYING或TERMINATED中的任意一種狀態
             * 2. 執行緒池不在RUNNING狀態,執行緒池接受了新的任務firstTask!=null
             * 3. 執行緒池不在RUNNING狀態,阻塞佇列為空
             */
            if (rs > SHUTDOWN || (rs == SHUTDOWN && firstTask != null) || (rs == SHUTDOWN && workQueue.isEmpty()))
                return false;

            for (; ; ) { // #3
                int wc = workerCountOf(c);
                /**
                 * 超過執行緒數
                 */
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;

                // 原子修改workercount,成功跳出兩層迴圈
                if (compareAndIncrementWorkerCount(c))
                    break retry; // 跳轉到 #2

                c = ctl.get();

                // 檢查rs時發現有其他執行緒改變了,重新取ctl
                if (runStateOf(c) != rs)
                    continue retry; // 跳轉到 #1
                else {
                    // 如果狀態沒有被人改變跳轉到 #3
                }

            }
        }
        // #2

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;

        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;

            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();

                try {
                    int rs = runStateOf(ctl.get());

                    /**
                     * 1. running 狀態
                     * 2. 如果為shutdown狀態並且task為null
                     */
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask ==null)) {
                        //已經啟動過了
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (!workerStarted) {
                addWorkerFailed(w);
            }
        }

        return workerStarted;
    }

    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        tryTerminate(); // In case SHUTDOWN and now empty
        return removed;
    }

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        /**
         * 1.小於corePoolSize直接addWorker
         * 2.>= corePoolSize&&正在執行中, 放入workQueue中排隊處理
         *   offer後再次檢查1.如果為非running狀態,remove 2.檢查是否有worker,因為command已經在queue中了需要有worker處理
         * 3.執行緒池狀態不是RUNNING或者workercount>=corePoolSize,
         *   addWorker(command, false),在addWorker函式會過濾執行緒池狀態,考慮正在shutdown的情況
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;

            // 這裡再取為了縮短時間視窗
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //放入後再做一次檢查,因為在ctl get到offer完成前有時間視窗,其他執行緒會修改ctl,所以需要recheck
            //如果offer完畢嘗試remove,如果可以remove就reject
            //否則判斷當前是否還有worker活著,如果沒有活著的worker,新增一個worker讓這個worker處理task
            if (!isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //執行緒池狀態不是RUNNING或者workercount>=corePoolSize
        else if (!addWorker(command, false))
            reject(command);
    }

    /**
     * 1. 狀態為>=STOP或者(rs==SHUTDOWN並且workQueue空)
     * 2. 超過maximumPoolSize
     * @return task
     *          null: 返回null前會compareAndDecrementWorkerCount表示當前worker需退出
     */
    private Runnable getTask() {
        boolean timedOut = false;

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 大於等於STOP 或 為SHUTDOWN && 佇列已經為空
            // if (rs >=  STOP || (rs == SHUTDOWN && workQueue.isEmpty())
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut)) &&
                    (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.MICROSECONDS)
                        : workQueue.take();
                if (r != null)
                    return r;

                timedOut = true;
            } catch (InterruptedException e) {
                timedOut = false;
            }
        }
    }

    private void runWorker(Worker worker) {
        Thread wt = Thread.currentThread();
        Runnable task = worker.firstTask;
        worker.firstTask = null;
        worker.unlock(); // 設定state=0,允許interrupts
        boolean completedAbruptly = true;
        try {
            // getTask阻塞的等待task或者超時
            while (task != null || (task = getTask()) != null) {
                // 如果拿到了任務,給自己上鎖,表示當前Worker已經要開始執行任務了,
                // 已經不是閒置Worker,有lock表示非閒置
                worker.lock();
                // 1. 如果執行緒池已經處於>=STOP狀態並且當前執行緒沒有被中斷,中斷執行緒
                // 2. 如果執行緒池還處於RUNNING或SHUTDOWN狀態,並且當前執行緒已經被中斷了,重新檢查一下執行緒池狀態,(跟shutdownNow有競爭)
                // 如果處於STOP狀態並且沒有被中斷,那麼中斷執行緒
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                        && !wt.isInterrupted()) {
                    //設定為執行緒中斷
                    wt.interrupt();
                }

                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //真正執行任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        afterExecute(wt, thrown);
                    }
                } finally {
                    task = null;
                    worker.completedTasks++;
                    worker.unlock();
                }

            }
            //正常結束
            completedAbruptly = false;
        } finally {
            // worker退出
            processWorkerExit(worker, completedAbruptly);
        }
    }

    public ThreadFactory getThreadFactory() {
        return threadFactory;
    }

    /**
     * 繼承AbstractQueuedSynchronizer為了實現對中斷標誌的管理以及防止在runWorker前就被interrupt
     * 在每次執行task前會lock,在中斷前會tryLock
     */
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        private Worker(Runnable task) {
            setState(-1);       //防止在runWorker前就被interrupts
            this.firstTask = task;
            this.thread = getThreadFactory().newThread(this);
        }

        @Override
            public void run() {
                runWorker(this);
            }

            protected boolean isHeldExclusively() {
                return getState() != 0;
            }

            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0,1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return  false;
            }

            protected boolean tryRelease() {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }

            public void lock() {
                acquire(1);
            }

            public boolean tryLock() {
                return tryAcquire(1);
            }

            public void unlock() {
                release(1);
            }

            public boolean isLock() {
                return isHeldExclusively();
            }

            void interruptIfStarted() {
                Thread t;
                // !=-1
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {

                    }
                }
            }
    }

    /**
     * 1. 原來狀態大於等於targetState
     * 2. 設定targetState狀態
     * @param targetState SHUTDOWN or STOP
     */
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                    ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }
    void onShutdown() {
    }

    @Override
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers(false);
            onShutdown();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                w.interruptIfStarted();
            }
        } finally {
            mainLock.unlock();
        }
    }

    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }

        tryTerminate();
        return tasks;
    }
}


相關推薦

ThreadPoolExecutor執行程式碼理解

主要的併發訪問變數: 1. ctl(3位:狀態位 + 29位:workerCount) 2. workers工作執行緒的集合以及一些統計資料,使用mainLock控制保護 3. Worker的中斷標誌,使用AbstractQueuedSynchronizer中的state

python多執行————7、ThreadPoolExecutor執行

所用到的模組:from concurrent.futures import ThreadPoolExecutor,as_completed,wait,FIRST_COMPLETED 1、建立執行緒池:executor = ThreadPoolExecutor(max_workers= ) 2

常用執行程式碼

執行緒池的介紹 1 常用的 池化技術 C3P0 DBCP 2 執行緒池的衍生 頻繁的建立執行緒物件和多執行緒之間進行上下文切換,是非常耗時間和資源的所以JDK1.5中提出了執行緒池技術 3 使用執行緒池 Exector 4 執行緒池的建立 1 建立一個固

ThreadPoolExecutor(執行的構建)

首先整理一下執行緒池的一些概念 執行緒池的作用: 執行緒池作用就是限制系統中執行執行緒的數量。 根據系統的環境情況,可以自動或手動設定執行緒數量,達到執行的最佳效果;少了浪費了系統資源,多了造成系統擁擠效率不高。用執行緒池控制執行緒數量,其他執行緒 排隊等候。一個任務執行完畢,再從

​​​​​​​ThreadPoolExecutor執行之submit方法

jdk1.7.0_79    在上一篇《ThreadPoolExecutor執行緒池原理及其execute方法》中提到了執行緒池ThreadPoolExecutor的原理以及它的execute方法。本文解析ThreadPoolExecutor#submit。   

ThreadPoolExecutor 執行執行者

整體的執行邏輯,不妨舉個例子。某個工廠要招工人完成訂單。有兩個重要的引數:1,長工的數目(假設為10),2,總工人的最大數目(假設為20)。 一開始,沒有工人,所以要招長工。現在開始,每有一個訂單,就去招一個工人當長工。直到長工數為10。接下來,老闆考慮訂單雖然多了,可

Java執行理解和認識

什麼是程式,什麼是程序,什麼是執行緒,他們有什麼區別? 程式是指令和資料的有序集合,其本身並沒有任何執行的含義,是一個靜態的概念。 程序是一個動態的過程,是一個活動的實體。簡單來說,一個應用程式得到執行就可以看作是一個程序。程序可以包含多個同時執行的執行緒 執行緒是

JAVA中的執行理解

為了更好的理解,首先來看一篇JAVA中的執行緒池介紹: 一、簡介 執行緒池類為 java.util.concurrent.ThreadPoolExecutor,常用構造方法為: ThreadPoolExecutor(int corePoolSize, int maxim

Java 執行建立 理解(簡單)(只要反覆理解,就能理解)

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class cachethreadpool {     public static void main (String

自己對執行理解

     工作中難免會遇到需要使用執行緒的地方,在使用執行緒的過程中,執行緒池我覺得是非常有必要去簡單瞭解一下的;這裡我會將自己所體會的感悟的一些基本內容分享出來,希望能對少數人有所幫助;如果有不對的地方,還請各位幫忙指出。 為什麼使用執行緒池?      在J

ThreadPoolExecutor執行的使用

實現例子:根據上傳檔案(.txt)按行讀取文字資料匯入mongodb (只做參考,實際也許不這麼幹) package net.youqu.manager.controller; import com.google.common.collect.Maps; impor

[python] ThreadPoolExecutor執行和ProcessPoolExecutor程序

引言 Python標準庫為我們提供了threading和multiprocessing模組編寫相應的多執行緒/多程序程式碼,但是當專案達到一定的規模,頻繁建立/銷燬程序或者執行緒是非常消耗資源的,這個時候我們就要編寫自己的執行緒池/程序池,以空間換時間。但從Py

ThreadPoolExecutor執行引數

2018年11月07日 14:49:17 剎那芳華_流年 閱讀數:4 個人分類: Java

論如何優雅的自定義ThreadPoolExecutor執行

更好的markDown閱讀體驗可直接訪問我的CSDN部落格:https://blog.csdn.net/u012881584/article/details/85221635 前言 執行緒池想必大家也都用過,JDK的Executors 也自帶一些執行緒池。但是不知道大家有沒有想過,如何才是最優雅的方式去使

十七、ThreadPoolExecutor執行

一、簡介 JDK的Executor框架的實現類ThreadPoolExecutor,實現了Executor介面和ExecutorService介面。 ThreadPoolExecutor執行過程如下: 1)判斷corePoolSize是否都執行中,如果不是那麼直接執行任務。 2)判斷緩衝佇列是否滿了,

你真的懂ThreadPoolExecutor執行技術嗎?看了原始碼你會有全新的認識

Java是一門多執行緒的語言,基本上生產環境的Java專案都離不開多執行緒。而執行緒則是其中最重要的系統資源之一,如果這個資源利用得不好,很容易導致程式低效率,甚至是出問題。 有以下場景,有個電話撥打系統,有一堆需要撥打的任務要執行,首先肯定是考慮多執行緒非同步去執行。假如我每執行一個撥打任務都new一個Th

淺談對執行理解

1、首先由幾個介面和類的關係是需要先說明的: extends   implementsextends Executor(介面)----------------->ExecutorService(介面)------------------->AbstractExe

ThreadPoolExecutor執行解析與BlockingQueue的三種實現

目的 主要介紹ThreadPoolExecutor的用法,和較淺顯的認識,場景的使用方案等等,比較忙碌,如果有錯誤還請大家指出 ThreadPoolExecutor介紹 ThreadPoolExecutor的完整構造方法的簽名如下 ThreadP

ThreadPoolExecutor執行詳細說明

ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler) corePoolSize: 核心執行緒數,能夠同時執行的任務數量 maxim

ThreadPoolExecutor執行的簡單應用

一、什麼是執行緒池? 執行緒池,其實就是一個容納多個執行緒的容器,其中的執行緒可以反覆使用,省去了頻繁建立執行緒物件的操作,無需反覆建立執行緒而消耗過多資源。 二、執行緒池的優勢 第一:降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。 第二:提