java執行緒池 ThreadPoolExecutor
阿里巴巴的java開發規範上說執行緒池要自己用new建立,方便開發理解執行緒池的各個引數。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
引數介紹:
corePoolSize 核心執行緒數,預設情況下核心執行緒會一直存活,即使處於閒置狀態也不會受存keepAliveTime限制。除非將allowCoreThreadTimeOut設定為true。
maximumPoolSize 指的是執行緒池的最大大小(執行緒池中最大有corePoolSize 個執行緒可執行)。
keepAliveTime 指的是空閒執行緒結束的超時時間(當一個執行緒不工作時,過keepAliveTime 長時間將停止該執行緒)。
unit 是一個列舉,表示 keepAliveTime 的單位(有NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS,7個可選值)。
workQueue 表示存放任務的佇列(存放需要被執行緒池執行的執行緒佇列)。
threadFactory 執行緒工廠,提供建立新執行緒的功能。ThreadFactory是一個介面,只有一個方法
public interface ThreadFactory {
Thread newThread(Runnable r);
}
通過執行緒工廠可以對執行緒的一些屬性進行定製,比如執行緒名。
handler 拒絕策略(新增任務失敗後如何處理該任務).RejectedExecutionHandler是一個介面,只有一個方法
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
在這個方法中可以自己進行處理,比如日誌記錄被拒絕的任務。
執行原理
1、執行緒池剛建立時,裡面沒有一個執行緒。
2、當呼叫 execute() 方法新增一個任務時,執行緒池會做如下判斷:
a. 如果正在執行的執行緒數量小於 corePoolSize,那麼馬上建立執行緒執行這個任務;
b. 如果正在執行的執行緒數量大於或等於 corePoolSize,那麼將這個任務放入佇列。
c. 如果這時候佇列滿了,而且正在執行的執行緒數量小於 maximumPoolSize,那麼還是要建立執行緒執行這個任務;
d. 如果佇列滿了,而且正在執行的執行緒數量大於或等於 maximumPoolSize,那麼執行緒池會呼叫RejectedExecutionHandler的 rejectedExecution方法丟擲異常。
3、當一個執行緒完成任務時,它會從佇列中取下一個任務來執行。
4、當一個執行緒無事可做,超過一定的時間(keepAliveTime)時,執行緒池會判斷,如果當前執行 的執行緒數大於 corePoolSize,那麼這個執行緒就被停掉。所以執行緒池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。
任務佇列
SynchronousQueue
/**
* Always returns zero.
* A {@code SynchronousQueue} has no internal capacity.
*
* @return zero
*/
public int size() {
return 0;
}
/**
* Always returns zero.
* A {@code SynchronousQueue} has no internal capacity.
*
* @return zero
*/
public int remainingCapacity() {
return 0;
}
這個佇列的size和remainingCapacity(可用數量)永遠返回0,執行緒池認為等待佇列已經滿了。
就是說用這個佇列,等待佇列的size為0,有新的任務加入,會直接執行(用空閒執行緒或建立新的執行緒),如果執行緒數量達到max,那麼執行緒池會丟擲異常。
ArrayBlockingQueue 和 LinkedBlockingQueue
一個是陣列實現,一個是連結串列實現,可以指定佇列長度。
ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通、最常用的阻塞佇列。
拒絕策略RejectedExecutionHandler
ThreadPoolExecutor中提供了幾種策略可以使用:
- AbortPolicy
- DiscardPolicy
- DiscardOldestPolicy
- CallerRunsPolicy
AbortPolicy
該策略是執行緒池的預設策略。使用該策略時,如果執行緒池佇列滿了丟掉這個任務並且丟擲RejectedExecutionException異常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
DiscardPolicy
如果執行緒池佇列滿了,會直接丟掉這個任務並且不會有任何異常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
DiscardOldestPolicy
將最早進入佇列的任務刪掉,再嘗試加入佇列。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
CallerRunsPolicy
使用此策略,如果新增到執行緒池失敗,那麼主執行緒(執行execute()方法的執行緒)會自己去執行該任務,不會等待執行緒池中的執行緒去執行。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
自定義
如果以上策略都不符合需求,可以自己定義拒絕策略,實現RejectedExecutionHandler介面,並實現rejectedExecution方法就可以了。具體的邏輯就在rejectedExecution方法中定義。