ThreadPoolExecutor 幾個疑惑與解答
阿新 • • 發佈:2019-04-21
keepal cor throw 線程數 核心 判斷 並且 on() 阻塞
- 任務是否都要先放入隊列?
當工作線程數小於核心線程數時,任務是不會經過隊列,而是直接創建 Worker 時傳入。但是如果工作線程數已經大於核心線程數,則任務是要先放入隊列的。實際上只要是被創建的工作線程所執行都是不需要經過工作隊列的,而是在創建新工作線程時作為參數傳入處理。對應就是調用 addWorker 方法的地方。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 工作線程數<核心線程數,創建核心線程並直接執行該任務 if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 工作線程數大於核心線程數,小於最大允許線程數,創建線程並執行該任務 else if (!addWorker(command, false)) reject(command); }
- 什麽時候創建額外的線程
隊列已經滿了,並且當前工作線程數小於最大允許線程數才會創建額外的線程。實際上所有調用 addWorker 方法的地方都會經過該判斷。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 隊列隊列已經滿了,才會執行 addWorker 邏輯 else if (!addWorker(command, false)) reject(command); } // addWorker 中的部分代碼 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 增加新的線程數 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
- 怎麽銷毀多余線程
當隊列中沒有任務之後,執行的線程將會被銷毀。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // getTask() 返回為null,也就是隊列中沒有任務了 while (task != null || (task = getTask()) != null) { // 省略代碼 } completedAbruptly = false; } finally { // 移除worker processWorkerExit(w, completedAbruptly); } }
- 如何實現讓多余線程在指定時間後銷毀?
超過核心線程數的線程,獲取隊列任務會使用 poll 方法,增加阻塞時間,如果在指定的時間沒有任務到達,就會返回null,從而銷毀該線程
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// poll 方法設置時間,也就是獲取任務增加阻塞時間
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- Worker 繼承AQS的作用
繼承了AQS類,可以方便的實現工作線程的中止操作; - 可以設置的最大線程數
2^29 次方,因為ctl高三位被用於表示當前線程池的狀態了,所以只有29位用於表示最大線程池的大小。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
- 拒絕策略什麽時候起作用?
工作線程數已經達到 maximumPoolSize,並且隊列已經滿了,則會啟用拒絕策略
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 工作線程數<核心線程數,創建核心線程並直接執行該任務
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
// 工作線程數已經達到達到設置值,隊列也已經滿,則拒絕
reject(command);
}
歡迎轉載,但請註明本文鏈接,謝謝你。
2019.04.21 17:47
ThreadPoolExecutor 幾個疑惑與解答