java 執行緒池原始碼分析
阿新 • • 發佈:2021-07-18
- java執行緒池幹了些什麼事
- 主要是維持一些常駐的執行緒,避免每次執行任務新建執行緒的開銷
- 將執行緒的執行和提交解耦,使用者只用關心執行緒的提交,而不用關心執行緒的執行
- 我們通常怎麼使用執行緒池
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2); Future<?> future = threadPoolExecutor.submit(new Runnable() { @Override public void run() { int a = 1/0; } }); future.get();
- 從submit方法出發,看看都幹了些什麼事
- submit方法是executorService介面的方法,在threadPoolExecutor的父類,abstractExecutorService中被實現。
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
- 可以看到,先是將runnable轉換成一個futureTask。然後執行。
- 在threadPoolExecutor中
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
- 如果當前worker的數量少於coreSize,直接新加一個worker, 否則,將任務加入佇列中。
- 看下worker的結構。
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
- worker 本身是一個runnable的例項,同時包含一個thread變數。在worker方法被執行緒執行時,會呼叫到run方法,然後看下runWorker。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- 首先runWorker是一個死迴圈,不斷的從queue中取任務執行,由於是阻塞佇列,佇列中無任務時,阻塞等待。需要注意的是,如果這裡task拋了異常,會丟擲exception,然後執行workerExit邏輯
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
- 如果woker因為異常退出了,那這個worker和worker內部維護的thread都會被銷燬,從workset中移除之後,worker沒有引用,等待被gc。同時,看是不是要補充新的worker。
ScheduledThreadPoolExecutor
- ScheduledThreadPoolExecutor相比普通的threadPoolExecutor幹了什麼事呢,無非是一個任務,隔一定間隔重複執行。
- 我們怎麼使用ScheduledThreadPoolExecutor?
@Test
public void scheduleExecutorService() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
executorService.scheduleWithFixedDelay(()->{
System.out.println("aa");
},2,2,TimeUnit.SECONDS);
executorService.scheduleAtFixedRate(()->{
System.out.println("aa");
},2,2,TimeUnit.SECONDS);
}
- 看下scheduleAtFixedRate和scheduleWithFixedDelay的區別是什麼
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
- 兩個方法的唯一區別是在構造ScheduledFutureTask時,scheduleWithFixedDelay傳入的是負值,看下怎麼處理這個時間的,
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
/**
* Returns the trigger time of a delayed action.
*/
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
- 對於scheduleAtFixedRate,p>0,下次執行時間是上次執行時間+p。 對於scheduleWithFixedDelay,p<0,下次執行時間為當前時間+p,即本次任務的執行結束時間+p
- scheduleWithFixedDelay前一個任務完成之後,開始計時,到指定的delay之後,有機會執行,具體執行時間看排程
- scheduleAtFixedRate, 從任務提交開始,計算時間,按照指定的間隔執行任務。
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
- 我們對於scheduleThreadPool的任務提交,提交之後,即加入佇列中。然後和ThreadPoolExecutor一樣,由worker不斷的從佇列中取任務出來執行。且執行完之後,再設定下一次的執行時間,扔到佇列裡。
- 這裡的佇列是維護了一個最小堆,每次從堆頂拿要執行的任務。上面我們知道,worker從佇列中取任務時,是呼叫佇列的take方法。
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
- 如果佇列中沒有任務,則等待,如果隊首的任務已經到該執行的時間了,則直接返回,如果還沒到delay的時間,則等待delay時間。
- 這裡leader是幹啥的呢?leader用於記載當前哪個執行緒是第一個來取隊首任務的執行緒,如果一個執行緒標記了,後面的執行緒就執行等待了,同時這個標記的執行緒只await delay的時間,這樣,當次執行緒醒來的時候,直接可以取到隊首的元素去執行了。如果不這樣的話,加入所有的執行緒都await(),後面大家還需要再競爭一次。這樣通過leader機制,減少了一次競爭。
- 那如果再等待的時候,又來了一個更快執行的任務呢,
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
- 如果佇列裡通過offer又加入了一個新的task,且加在了隊首,那這時候,會呼叫 available.signal();,喚醒所有再等待的執行緒,大家再重新競爭,重新獲取隊首任務。