多執行緒之執行緒池newFixedThreadPool(二)
阿新 • • 發佈:2019-02-19
在上一章中我們概述了一下執行緒池,這一章我們看一下建立newFixedThreadPool的原始碼。例子還是我們在上一章中寫的那個例子。
建立newFixedThreadPool的方法:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
上面這兩個方法是建立固定數量的執行緒池的兩種方法,兩者的區別是:第二種建立方法多了一個執行緒工廠的方法。我們繼續看ThreadPoolExecutor這個類中的建構函式:public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
ThreadPoolExecutor的建構函式:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPollExecutor中的所有的建構函式最終都會呼叫上面這個建構函式,接下來我們來分析一下這些引數的含義:
corePoolSize:
執行緒池啟動後,在池中保持的執行緒的最小數量。需要說明的是執行緒數量是逐步到達corePoolSize值的。例如corePoolSize被設定為10,而任務數量只有5,則執行緒池中最多會啟動5個執行緒,而不是一次性地啟動10個執行緒。maxinumPoolSize:
執行緒池中能容納的最大執行緒數量,如果超出,則使用RejectedExecutionHandler拒絕策略處理。keepAliveTime:
執行緒的最大生命週期。這裡的生命週期有兩個約束條件:一:該引數針對的是超過corePoolSize數量的執行緒;二:處於非執行狀態的執行緒。舉個例子:如果corePoolSize(最小執行緒數)為10,maxinumPoolSize(最大執行緒數)為20,而此時執行緒池中有15個執行緒在執行,過了一段時間後,其中有3個執行緒處於等待狀態的時間超過keepAliveTime指定的時間,則結束這3個執行緒,此時執行緒池中則還有12個執行緒正在執行。unit:
這是keepAliveTime的時間單位,可以是納秒,毫秒,秒,分鐘等。workQueue:
任務佇列。當執行緒池中的執行緒都處於執行狀態,而此時任務數量繼續增加,則需要一個容器來容納這些任務,這就是任務佇列。這個任務佇列是一個阻塞式的單端佇列。
threadFactory:
定義如何啟動一個執行緒,可以設定執行緒的名稱,並且可以確定是否是後臺執行緒等。handler:
拒絕任務處理器。由於超出執行緒數量和佇列容量而對繼續增加的任務進行處理的程式。 OK,ThreadPoolExecutor中的主要引數介紹完了。我們再說一下執行緒的管理過程:首先建立一個執行緒池,然後根據任務的數量逐步將執行緒增大到corePoolSize,如果此時仍有任務增加,則放置到workQueue中,直到workQueue爆滿為止,然後繼續增加池中的執行緒數量(增強處理能力),最終達到maxinumPoolSize。那如果此時還有任務要增加進來呢?這就需要handler來處理了,或者丟棄新任務,或者拒絕新任務,或者擠佔已有的任務。在任務佇列和執行緒池都飽和的情況下,一旦有執行緒處於等待(任務處理完畢,沒有新任務)狀態的時間超過keepAliveTime,則該執行緒終止,也就是說池中的執行緒數量會逐漸降低,直至為corePoolSize數量為止。在《編寫高質量程式碼 改善Java程式的151個建議》這本書裡舉的這個例子很形象:OK,接下來我們來看一下怎麼往任務隊裡中放入執行緒任務:在java.util.concurrent.AbstractExecutorService這個類的submit方法
submit方法
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);//執行任務
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);//執行任務
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);//執行任務
return ftask;
}
這是三個過載方法,分別對應Runnable、帶結果的Runnable介面和Callable回撥函式。其中的newTaskFor也是一個過載的方法,它通過層層的包裝,把Runnable介面包裝成了適配RunnableFuture的實現類,底層實現如下:
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
在submit中最重要的是execute這個方法,這個方法也是我們分析的重點execute方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
在這個方法中分為三部分
1、如果少於corePoolSize數量的執行緒在執行,則啟動一個新的執行緒並把傳進來的Runnable做為第一個任務。然後會檢查執行緒的執行狀態和worker的數量,阻止不符合要求的任務新增到執行緒中
2、如果一個任務成功的放入到了佇列中,我們仍然需要二次檢查我們是否應該新增執行緒或者停止。因此我們重新檢查執行緒狀態,是否需要回滾佇列,或者是停止或者是啟動一個新的執行緒
3、如果我們不能新增佇列任務了,但是仍然在往佇列中新增任務,如果新增失敗的話,用拒絕策略來處理。
這裡最主要的是addWorker這個方法:
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
我們在這個方法裡建立一個執行緒,注意這個執行緒不是我們的任務執行緒,而是經過包裝的Worker執行緒。所以這裡的run方法是Worker這個類中的run方法。execute方法是通過Worker類啟動的一個工作執行緒,執行的是我們的第一個任務,然後該執行緒通過getTask方法從任務佇列總獲取任務,之後再繼續執行。這個任務佇列是一個BlockingQueue,是一個阻塞式的,也就是說如果該佇列元素為0,則保持等待狀態。直到有任務進入為止。