1. 程式人生 > >ThreadPoolExecutor 幾個疑惑與解答

ThreadPoolExecutor 幾個疑惑與解答

keepal cor throw 線程數 核心 判斷 並且 on() 阻塞

  1. 任務是否都要先放入隊列?
    當工作線程數小於核心線程數時,任務是不會經過隊列,而是直接創建 Worker 時傳入。但是如果工作線程數已經大於核心線程數,則任務是要先放入隊列的。實際上只要是被創建的工作線程所執行都是不需要經過工作隊列的,而是在創建新工作線程時作為參數傳入處理。對應就是調用 addWorker 方法的地方。
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 工作線程數<核心線程數,創建核心線程並直接執行該任務
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 工作線程數大於核心線程數,小於最大允許線程數,創建線程並執行該任務
    else if (!addWorker(command, false))
        reject(command);
}
  1. 什麽時候創建額外的線程
    隊列已經滿了,並且當前工作線程數小於最大允許線程數才會創建額外的線程。實際上所有調用 addWorker 方法的地方都會經過該判斷。
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 隊列隊列已經滿了,才會執行 addWorker 邏輯
    else if (!addWorker(command, false))
        reject(command);
}


// addWorker 中的部分代碼
retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;

    for (;;) {
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        // 增加新的線程數
        if (compareAndIncrementWorkerCount(c))
            break retry;
        c = ctl.get(); // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}
  1. 怎麽銷毀多余線程
    當隊列中沒有任務之後,執行的線程將會被銷毀。
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // getTask() 返回為null,也就是隊列中沒有任務了
            while (task != null || (task = getTask()) != null) {
                // 省略代碼
            }
            completedAbruptly = false;
        } finally {
            // 移除worker
            processWorkerExit(w, completedAbruptly);
        }
    }
  1. 如何實現讓多余線程在指定時間後銷毀?
    超過核心線程數的線程,獲取隊列任務會使用 poll 方法,增加阻塞時間,如果在指定的時間沒有任務到達,就會返回null,從而銷毀該線程
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // poll 方法設置時間,也就是獲取任務增加阻塞時間
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
  1. Worker 繼承AQS的作用
    繼承了AQS類,可以方便的實現工作線程的中止操作;
  2. 可以設置的最大線程數
    2^29 次方,因為ctl高三位被用於表示當前線程池的狀態了,所以只有29位用於表示最大線程池的大小。
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING = -1 << COUNT_BITS;
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    private static final int STOP = 1 << COUNT_BITS;
    private static final int TIDYING = 2 << COUNT_BITS;
    private static final int TERMINATED = 3 << COUNT_BITS;
  1. 拒絕策略什麽時候起作用?
    工作線程數已經達到 maximumPoolSize,並且隊列已經滿了,則會啟用拒絕策略
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 工作線程數<核心線程數,創建核心線程並直接執行該任務
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        // 工作線程數已經達到達到設置值,隊列也已經滿,則拒絕
        reject(command);
}

歡迎轉載,但請註明本文鏈接,謝謝你。
2019.04.21 17:47

ThreadPoolExecutor 幾個疑惑與解答