執行緒池的管理,原始碼解析以及RejectedExecution
對於執行緒池用的也不少,但是最近還是糟了一坑。還是要深入瞭解一下執行緒池的使用,以免以後遭重
對於JavaEE專案來說,服務端專案是一直啟動著,有的時候需要非同步或者大併發的專案來的時候,就會想到使用多執行緒或者執行緒池來維護有關的執行緒。
最近有一個很簡單的需求,就是一個方法,可能是非同步或者同步返回一個結果。該方法,全域性一個呼叫就夠了,不用很多執行緒同時工作。就想著用future,如果是同步的話就用get獲取結果,同步返回。如果是非同步的話,就不get,直接非同步返回一個正在處理
這個思路是沒問題的,於是程式碼如下(因為原因是一樣的問題,所以一下文章不考慮同步情況,只考慮非同步)
package threadPools.zxy.threadPools.zxy; import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadFuture { private static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); public static void main(String[] args) { executor.submit(new Callable<String>() { public String call() throws Exception { System.out.println("task-->threadName:" + Thread.currentThread().getName()); Thread.sleep(1000); return null; } }); executor.shutdown(); } }
假設這就是伺服器上的程式碼,其實就是很簡單,我計劃申請一個執行緒池,corePoolSize為1, maximumPoolSize為1, keepAliveTime 為0, unit 為秒,workQueue為LinkedBlockingQueue。(其實這個和ExecutorService executor2 = Executors.newSingleThreadExecutor(),基本是一樣的,newSingleThreadExecutor()就是封裝後的ThreadPoolExecutor,這次就以ThreadPoolExecutor為例)
我的想法很簡單,就是就一個執行緒執行,也不需要其餘執行緒存活時間,非同步執行完一遍以後,執行緒池shutdown,釋放執行緒池,節省空間,等待下一次使用者的訪問。
結果在伺服器上執行該程式碼,只有第一次訪問的時候執行緒正常執行,後續訪問沒反應,會報錯,就像這樣:
Exception in thread "main" task-->threadName:pool-1-thread-1
java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
at threadPools.zxy.threadPools.zxy.ThreadFuture.main(ThreadFuture.java:14)
好好的程式碼,怎麼會報錯呢,趕時間開發,最隨便查了下,有人說是策略的問題,換個策略就好了,於是就將程式碼改成:public class ThreadFuture {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),new ThreadPoolExecutor.CallerRunsPolicy());
public static void main(String[] args) {
//模仿多次客戶端請求
for (int i = 0; i < 10; i++) {
executor.submit(new Callable<String>() {
public String call() throws Exception {
System.out.println("task-->threadName:" + Thread.currentThread().getName());
Thread.sleep(1000);
return null;
}
});
executor.shutdown();
}
}
}
測了一下,現在是不報錯了,但是客戶端的十次請求,只有第一次訪問有效,後來的訪問全部沒有。這還不是問題的解決啊。轉而,想了下,會不會是executor.shutdown()的問題呢,可是很多人都說用完要關掉的啊,要不然不是每次訪問都會新增很多執行緒嗎!
too native! 這就是對執行緒池瞭解太淺了
其實,shutdown()方法就是關閉執行緒池,但其實準確來說,shutdown方法是已在執行的執行緒和佇列中的執行緒可以繼續執行,但是新加入的執行緒會被拒絕,因為,執行緒池已經說了,我要關閉了,就如同超市晚上關門,我不好意思敢客人們出來,你們繼續買,但是新的客人我是不會在放進去了,因為最後的客人買完東西,我就要關門了。
所以,盲目的改策略沒有任何用,因為是執行緒池已經關閉了。
什麼時候,shutdown()呢,這裡有個問題就是,你為什麼需要執行緒池。
其實,執行緒池的誕生就是因為,在大量訪問進來的時候,不斷的new thread(),會造成急劇的效能損失,而且,也不能對執行緒很方便的管理。就如同,一個公司,在一個月的某個時刻非常的忙,有100個任務,於是,公司在這個時候飛快的招了新的員工,在辦完任務,就把100個員工全開除了,這顯然是非常不合理的。
執行緒池的作用就是,在一個公司招了一個人事主管,這個人事主管招了幫忙招了5個人,在100個任務來的時候,給每人一個任務,剩餘的95個任務放在任務清單表裡,一個一個分配著做完。在不忙的時候,讓這五個人掛在哪裡休息,或者設定keepAliveTime,有一段時間沒活,還可以開除幾個節省資源maximumPoolSize,只保留核心員工corePoolSize。
剛剛的程式碼,在第一次執行以後,就把人事主管開除了,後續任務再進來,執行緒池都關閉了(公司都沒人了),誰幹活呢?
所以,平時在用main方法的時候呼叫shutdown()是因為正常執行結束後,執行緒池也就沒用了,但是伺服器是一直執行的,shutdown要慎重啊,後續的程式碼,把shutdown去掉以後,服務也就正常執行了,但這也引起了我對執行緒池的興趣。
執行緒池的各個引數到底是什麼意思呢,各個方法的呼叫和結束又經歷了哪些過程呢?
上一個程式碼:
public static void main(String[] args) {
final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(4);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS, queue,
new ThreadPoolExecutor.AbortPolicy());
// 模仿多次客戶端請求
for (int i = 0; i < 10; i++) {
executor.submit(new Callable<String>() {
public String call() throws Exception {
Thread.sleep(500);
System.out.println("task-->threadName:" + Thread.currentThread().getName());
System.out.println("阻塞佇列長度" + queue.size());
Thread.sleep(500);
return null;
}
});
}
}
}
設一個阻塞佇列長度為4,設核心執行緒為1,最大執行緒為3,最大執行緒存活時間為0,策略是報錯丟擲策略
結果如下:
分析一下結果,在這個顯然核心執行緒不夠用的例子中,十輪迴圈,阻塞佇列最大的時候就是長度為4的時候,也是我設定的值,執行緒數最多啟動了三個,也符合了maximumPoolSize的申明。
首先執行緒報錯,是因為阻塞佇列放不下了,最大執行緒數也都起來了,但是還是不夠,這個時候3+4=7,而併發跑入的程式是10條,這就導致了。十份'活',給一個做,這個人做不完,放在佇列裡等著一會做,但是佇列只能放4個,沒辦法,在趕緊招兩個人,總共三個人,每人一份'活',佇列裡放四個。全滿了,再進來新的'活'的時候,佇列放不下,又不能招人了,那還是算了,我乾脆自爆吧,就按照異常策略丟擲了異常。
控制檯的輸出,也應證了這一點。一開始,執行緒1獲取工作,開始做工,剩餘的四個工作全部存入阻塞佇列,接下來兩份工作進來,沒地方存,但是最大數量執行緒池告知還可以擴員,於是又趕忙招了2兩個人,一起到達上線,maximumPoolSize=3,這個時候,在來新的工作,就沒地方了,丟擲了異常。
雖然丟擲了異常,但可以看到,剩餘的執行緒仍勤勤懇懇得把執行緒池裡面的剩餘四個任務分著做完了,才低調的離去,最終佇列長度為0
總結:
- corePoolSize:核心執行緒數
- 核心執行緒會一直存活,及時沒有任務需要執行
- 當執行緒數小於核心執行緒數時,即使有執行緒空閒,執行緒池也會優先建立新執行緒處理
- 設定allowCoreThreadTimeout=true(預設false)時,核心執行緒會超時關閉
- queueCapacity:任務佇列容量(阻塞佇列)
- 當核心執行緒數達到最大時,新任務會放在佇列中排隊等待執行
- maxPoolSize:最大執行緒數
- 當執行緒數>=corePoolSize,且任務佇列已滿時。執行緒池會建立新執行緒來處理任務
- 當執行緒數=maxPoolSize,且任務佇列已滿時,執行緒池會拒絕處理任務而丟擲異常
- keepAliveTime:執行緒空閒時間
- 當執行緒空閒時間達到keepAliveTime時,執行緒會退出,直到執行緒數量=corePoolSize
- 如果allowCoreThreadTimeout=true,則會直到執行緒數量=0
- allowCoreThreadTimeout:允許核心執行緒超時
- rejectedExecutionHandler:任務拒絕處理器
- 兩種情況會拒絕處理任務:
- 當執行緒數已經達到maxPoolSize,切佇列已滿,會拒絕新任務
- 當執行緒池被呼叫shutdown()後,會等待執行緒池裡的任務執行完畢,再shutdown。如果在呼叫shutdown()和執行緒池真正shutdown之間提交任務,會拒絕新任務
- 執行緒池會呼叫rejectedExecutionHandler來處理這個任務。如果沒有設定預設是AbortPolicy,會丟擲異常
- ThreadPoolExecutor類有幾個內部實現類來處理這類情況:
- AbortPolicy 丟棄任務,拋執行時異常
- CallerRunsPolicy 執行任務
- DiscardPolicy 忽視,什麼都不會發生
- DiscardOldestPolicy 從佇列中踢出最先進入佇列(最後一個執行)的任務
- 實現RejectedExecutionHandler介面,可自定義處理器
- 當執行緒數小於核心執行緒數時,建立執行緒。
- 當執行緒數大於等於核心執行緒數,且任務佇列未滿時,將任務放入任務佇列。
- 當執行緒數大於等於核心執行緒數,且任務佇列已滿
- 若執行緒數小於最大執行緒數,建立執行緒
- 若執行緒數等於最大執行緒數,丟擲異常,拒絕任務