JDK執行緒池
本文地址: juejin.im/post/5dba4a…
執行緒池用於存放執行緒,通過對執行緒的複用,很大程度上減少了頻繁建立和銷燬執行緒導致的資源損耗. 下面簡單地介紹一下 JDK(1.8)中的執行緒池.
執行緒池引數
在介紹JDK的4種執行緒池之前,先介紹一下執行緒池的幾個引數
- corePoolSize 執行緒池的核心執行緒數量,
- maximumPoolSize 執行緒池的最大執行緒數量
- keepAliveTime 執行緒被回收的最大空閒時間
- keepAliveTime 的單位(ms、s、...)
- BlockingQueue 任務佇列,存放任務
- ThreadFactory 執行緒工廠
- RejectedExecutionHandler 執行緒池拒絕策略(當任務過多的時候,執行緒池拒絕任務的方式)
allowCoreThreadTimeOut 允許核心執行緒池被回收,預設 false,so 預設情況下 核心執行緒並不會被回收掉.
// 用一個原子變數來儲存執行緒池狀態和執行緒數量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));
// 32 - 3 = 29,int 的左邊三位表示執行緒池狀態,右邊29位表示執行緒數量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 最大執行緒數量: 000 & (29個1) = 2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 執行緒池的幾個狀態
// 100 & (29個0),執行緒池執行狀態
private static final int RUNNING = -1 << COUNT_BITS;
// 000 & (29個0),執行shutdown方法時,不接收新的任務,會先執行完當前已經接收的,任務佇列任務執行完 && 執行緒已經全部終止: state -> TIDYING
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 & (29個0),執行 shutdownNow() 方法時
private static final int STOP = 1 << COUNT_BITS;
// 010 & (29個0)
private static final int TIDYING = 2 << COUNT_BITS;
// 011 & (29個0),執行緒池已經終止
private static final int TERMINATED = 3 << COUNT_BITS;
複製程式碼
JDK 提供的四種執行緒池介紹
當設定為無界佇列的時候,最大執行緒數並不會起作用,因為非核心執行緒只有在佇列滿了之後,才會被新增
newFixedThreadPool
固定執行緒池數量,核心執行緒數 = 最大執行緒數
任務佇列: LinkedBlockingQueue(Integer.MAX_VALUE) 無界佇列
適用於同時處理固定任務數的場景.
public static ExecutorService newFixedThreadPool(int nThreads) {
// coreThreads = maxThreads
return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
複製程式碼
newCachedThreadPool
核心執行緒數為0,最大為 Integer.MAX_VALUE,也就是當任務足夠多的時候,可以無限增加執行緒. 並且所有的執行緒空閒超過一段時間(呼叫 Executors 建立的預設 KeepAlive為 60s)就會被回收.
任務佇列: SynchronousQueue 預設傳入引數 fair=false
,即處理任務非公平.
適用於處理小任務
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
複製程式碼
SynchronousQueue
public SynchronousQueue(boolean fair) {
// fair = true 則會按照FIFO先入先出的順序執行
// fair = false(預設值) 則優先取出最新新增的任務,最早新增的任務最晚執行
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
複製程式碼
newSingleThreadExecutor
單個執行緒的執行緒池
任務佇列: LinkedBlockingQueue 同樣是無界佇列
適用於需要將任務按順序執行的時候
public static ExecutorService newSingleThreadExecutor() {
// 核心執行緒數=最大執行緒數=1
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1,1,new LinkedBlockingQueue<Runnable>()));
}
複製程式碼
newScheduledThreadPool
固定核心執行緒數,執行緒數量不會再增長,maximumPoolSize 這個引數對定時執行緒池沒有作用.
oracle的api檔案是這麼寫的:
While this class inherits from
ThreadPoolExecutor
,a few of the inherited tuning methods are not useful for it. In particular,because it acts as a fixed-sized pool usingcorePoolSize
threads and an unbounded queue,adjustments tomaximumPoolSize
have no useful effect.
任務佇列: DelayedWorkQueue 無界佇列,這是 ScheduledThreadPoolExecutor 的一個內部類
更適用於需要延時執行或者定時需求的場景
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize,0,NANOSECONDS,new DelayedWorkQueue());
}
複製程式碼
執行緒池拒絕策略
AbortPolicy
拒絕新任務,並且丟擲異常,預設的拒絕策略
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r,ThreadPoolExecutor e) {
// 直接丟擲異常
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
複製程式碼
CallerRunsPolicy
當拒絕任務的時候,由呼叫執行緒處理該任務.
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r,ThreadPoolExecutor e) {
// 如果執行緒池未停止
if (!e.isShutdown()) {
// 當前執行緒直接呼叫任務的run方法
r.run();
}
}
}
複製程式碼
DiscardPolicy
拒絕新任務,靜悄悄的將新任務丟棄,而不通知(太坑了吧),具體看它的程式碼也是什麼事情都沒做,還真的就直接丟棄任務了.
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r,ThreadPoolExecutor e) {
// 任務被拒絕後,啥事情都不幹
}
}
複製程式碼
DiscardOldestPolicy
當任務滿時,拋棄舊的未處理的任務,然後重新執行 execute 方法(此過程會重複),除非執行緒池停止執行,這種情況任務將被丟棄.具體看程式碼
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r,ThreadPoolExecutor e) {
// 如果執行緒池停止,直接丟棄任務,不做任何處理
if (!e.isShutdown()) {
// 丟棄一個任務佇列中的任務
e.getQueue().poll();
// 重新執行被拒絕的任務,如果再次被拒絕,則會一直重複這個過程
e.execute(r);
}
}
}
複製程式碼
終止執行緒池
shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改狀態為 shutdown
advanceRunState(SHUTDOWN);
// 停止空閒的執行緒,有執行任務的不會停止
interruptIdleWorkers();
// ScheduledThreadPoolExecutor 定時執行緒池才用到這個方法
onShutdown();
} finally {
mainLock.unlock();
}
// 終止執行緒池裡的所有執行緒
tryTerminate();
}
複製程式碼
interruptIdleWorkers
,從方法名可以看出,它會終止掉當前空閒的執行緒
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 因為執行任務 runWorker() 的時候,執行了 worker.lock()方法
// 所以如果當前執行緒有任務在執行,則 tryLock不會成功,
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
複製程式碼
shutdownNow
這個方法的作用是立刻強制停止所有執行緒,即使該執行緒有正在執行的任務.
並且停止所有執行緒後,返回任務佇列中還未執行的任務.
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改狀態為 stop
advanceRunState(STOP);
// 強制停止所有執行緒,有任務在執行也不管
interruptWorkers();
// 返回還未執行的任務
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 終止執行緒池裡的所有執行緒
tryTerminate();
return tasks;
}
// 暫停所有 worker 執行緒
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
// 如果執行緒已經啟動則直接終止
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
複製程式碼
下面這段程式碼兩個shutdown方法都呼叫到
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 以下情況,取消終止:
// 1.執行緒池是執行狀態 running
// 2.runStateAtLeast(c,TIDYING) 執行緒池狀態是 tidying 或者 terminated
// 3.執行緒池狀態為 shutdown && 任務佇列不為空,即是任務沒處理完 (shutdown狀態的時候會繼續處理已經新增的任務)
if (isRunning(c) ||
runStateAtLeast(c,TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
/**
* worker 在 {@link #getTask()} 的時候,獲取到 null 表示 worker 此時需要停止
* worker[] 移除當前worker後會呼叫這個方法將執行緒進行終止
* 每個worker停止的時候,會呼叫這個方法將當前執行緒進行終止
* so it is ONLY_ONE
* {@link #processWorkerExit}
*/
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 程式碼要執行到這裡到話,( state == stop || ( state == shutdown && workQueue 佇列為空) ) && 沒有正在執行的執行緒
// 這兩種情況下,所有的執行緒都已經終止
// cas 嘗試修改 ctl 的狀態,
// 修改失敗,外層 for迴圈會再次執行
if (ctl.compareAndSet(c,ctlOf(TIDYING,0))) {
try {
// 這裡不做任何事情
terminated();
} finally {
// 最後將狀態修改為 terminated,表示執行緒池完全停止
ctl.set(ctlOf(TERMINATED,0));
// 通知所有在等待鎖的執行緒
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
複製程式碼
shutdown 和 shutdownNow 對比:
shutdown: 執行緒將執行完已經新增進佇列中的所有任務,不接受新任務.
shutdownNow: 立刻終止所有正在執行的執行緒,並且返回任務佇列中的任務.
最後
最後的最後,非常感謝你們能看到這裡!!你們的閱讀都是對作者的一次肯定!!!
覺得文章有幫助的看官順手點個贊再走唄(終於暴露了我就是來騙讚的(◒。◒)),你們的每個贊對作者來說都非常重要(異常真實),都是對作者寫作的一次肯定(double)!!!
這一篇的內容到這就結束了,期待下一篇 還能有幸遇見你!