java threadPool 執行緒池簡單分析
java 1.5 concurrent 工具包中提供了五類執行緒池的建立:
ExecutorService executor=Executors.newCachedThreadPool(); ExecutorService cacheExecutor=Executors.newCachedThreadPool(new TestThreadFactory()); ExecutorService fixExecutor=Executors.newFixedThreadPool(10); ExecutorService fixedExecutor=Executors.newFixedThreadPool(10, new TestThreadFactory()); ExecutorService sigExecutor=Executors.newSingleThreadExecutor(); ExecutorService singleExecutor=Executors.newSingleThreadExecutor(new TestThreadFactory()); ScheduledExecutorService schExecutor=Executors.newScheduledThreadPool(10); ScheduledExecutorService scheduledExecutor=Executors.newScheduledThreadPool(10,new TestThreadFactory()); ScheduledExecutorService ssExecutor=Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService sigSchExcutor=Executors.newSingleThreadScheduledExecutor(new TestThreadFactory());
底層的實現原理基本一樣: new執行緒池的時候生成一個任務佇列(blockQueue<Runnable>),第一次執行execute()或者submit()方法時會建立一個迴圈的執行緒,用於反覆讀取佇列中的任務並執行之(ps:第一次提交的任務是不用進入任務佇列,由剛建立的執行緒直接執行 ),後續的 execute()或者submit()操作則直接提交Runnable任務到佇列裡.佇列為空時,迴圈執行緒就會被blockQueue的take()方法阻塞住.
SingleThreadExecutor其實是FixedThreadPool的一個特例,SingleThreadExecutor指定對於同一個佇列只有一個執行緒去迴圈讀取佇列任務並執行, FiexedThreadPool則可以為同一佇列指定多個執行緒去迴圈讀取佇列任務並執行.
newFixedThreadPool(10)會產生10個執行緒去讀取同一個任務佇列,但這10個執行緒不是同時產生,而是提交一個任務(即執行一次execute()或者submit()方法)產生一個,當提交的任務數量超過10個,第11個任務直接提交到blockQueue<Runnable>佇列裡,然後由這10個執行緒中的某個執行緒去獲取並執行該任務.FixedThreadPool產生的10個執行緒以後也不會被回收成9個,更不可能增加到11個.
CacheThreadPool不指定具體數量的執行緒去讀取並只執行任務佇列中的任務,但是它有個最大執行緒數(Integer.MAX_VALUE=2的32次-1), 當 任務佇列飽和無法插入新任務時,會自動生成一個新的執行緒去執行新插入的任務,並參與讀取飽和的任務佇列並執行.如果高峰期生成了10個執行緒,低谷期只需要一個執行緒來執行,其餘的9個執行緒在存活一段時間後就會被終止.存活時間預設是一分鐘.這一點要和FixedThreadPool區分.
ScheduledThreadPool執行緒池執行緒數量也需要預先指定,它的主要特點是按計劃延時讀取並執行佇列任務
無論何種執行緒,當任務佇列增加任務的速度大於佇列讀取執行的速度時,就可能產生任務丟失的情況,丟失的概率由低到高依次是
CacheThreadPool > newFixedThreadPool > SingleThreadExecutor,這個很好理解.這種情況下,程式預設都會向外丟擲RejectedExecutionException異常
new 執行緒池的時候另一個構造引數 ThreadFactory,主要用途就是對提交的任務做個簡單的封裝.
附上幾個核心的程式碼片段
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
/**
* Runs a single task between before/after methods.
*/
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
/*
* Ensure that unless pool is stopping, this thread
* does not have its interrupt set. This requires a
* double-check of state in case the interrupt was
* cleared concurrently with a shutdownNow -- if so,
* the interrupt is re-enabled.
*/
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
/*
* Track execution state to ensure that afterExecute
* is called only if task completed or threw
* exception. Otherwise, the caught runtime exception
* will have been thrown by afterExecute itself, in
* which case we don't want to call it again.
*/
boolean ran = false;
beforeExecute(thread, task);
try {
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
/**
* Main run loop
*/
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}