Java執行緒之執行緒池的使用
在Java中進行非同步操作時,執行緒必不可少,但如果頻繁的建立、銷燬一個執行緒,這是很耗效能的,所以執行緒池就應運而生了,Java中主要有newFixedThreadPool
、newCachedThreadPool
、newSingleThreadExecuter
及newScheduledThreadPool
這四種執行緒池。
用法 | 具體實現 | |
---|---|---|
newFixedThreadPool | Executors.newFixedThreadPool(int) Executors.newFixedThreadPool(int,ThreadFactory) | new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()) |
newCachedThreadPool | Executors.newCachedThreadPool() Executors.newCachedThreadPool(ThreadFactory) |
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); |
newSingleThreadExecuter | Executors.newSingleThreadExecutor() Executors.newSingleThreadExecutor(ThreadFactory) |
new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); |
newScheduledThreadPool | Executors.newScheduledThreadPool(int) Executors.newScheduledThreadPool(int,ThreadFactory) | new ScheduledThreadPoolExecutor(corePoolSize); |
注
ScheduledThreadPoolExecutor
是extend與ThreadPoolExecutor
的子類,傳入引數如下super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
- newFixedThreadPool:將建立一個固定長度的執行緒池,每當提交一個任務時將建立一個執行緒。直到達到執行緒池的最大數量,這時執行緒池的規模將不再變化(如果某個執行緒發生了未預測的Exception而結束,那麼執行緒池會補充一個新的執行緒)
- newCachedThreadPool:將建立一個可快取的執行緒池,如果執行緒池的規模超過了處理需求時,那麼將回收空閒的執行緒,而當需求增加時,則可以新增新的執行緒,執行緒池的規模不存在任何限制
- newSingleThreadExecuter:是一個單執行緒的Executor,它建立單個工作者來執行任務,如果這個執行緒異常結束,會建立另一個執行緒來替代,newSingleThreadExecuter確保依照任務在佇列中的順序來序列執行。
- newScheduledThreadPool:建立一個固定長度的執行緒池,而且以延遲或定時的方式來執行任務,類似於Timer。
ExecutorService
由於執行緒池都是實現了Executor介面,但Executor沒有生命週期,所以為了解決執行服務的生命週期問題,Executor擴充套件了ExecutorService介面,添加了一些用於生命週期的管理方法(同時還有一些用於任務提交的便利方法)。
- shutdown():執行平緩的關閉過程,不在接受新的任務,同時等待已經提交的任務執行完成(包括那些還未開始執行的任務)
- shutdownNow():將嘗試取消所有執行中的任務,返回所有未執行的任務
- isTerminated():可以判斷ExecutorService是否已經終止。可以通過輪詢來判斷
- awaitTermination(long timeout, TimeUnit unit):等待timeout後判斷是否終止
newScheduledThreadPool
在Java中,Timer負責管理延遲任務以及週期性任務,然而Timer存在以下的一些缺陷,
- Timer在執行所有定時任務時只會建立一個執行緒,如果某個任務的執行時間過長,那會將破壞其他TimerTask的定時精確性。而執行緒池能彌補這個缺陷,它可以提供多個執行緒來執行延時任務和週期任務
- 如果TimerTask丟擲一個未檢查的異常,那麼Timer將表現出糟糕的行為。Timer執行緒並不捕獲異常,因此當TimerTask丟擲未檢查的異常時將終止定時執行緒。在這種情況下,Timer也不會恢復執行緒的執行,而是會錯誤的認為整個Timer都被取消了。因此,已經被排程但尚未執行的TimerTask將不會執行,新的任務也不能被排程
因此如果要構建自己的排程服務,可以使用DelayQueue,它實現了BlockingQueue,併為newScheduledThreadPool(RxJava就是採用的newScheduledThreadPool來管理延遲任務及週期性任務的)提供排程功能。
攜帶結果的任務Callable與Future
Executor框架使用Runnable作為其基本的任務表示形式。Runnable是一種有很大侷限的抽象,雖然run能寫入到日誌檔案或者將結果放入某個共享的資料結構,但它不能返回一個值或者丟擲一個受檢查的異常。Callable是一種更好的抽象:它認為主入口點(即call)將返回一個值,並可能丟擲一個異常。
Future表示一個任務的生命週期,並提供相應的方法來判斷是否已經完成或取消,以及獲取任務的結果和取消任務等。在Future規範中包含的隱含意義是,任務的生命週期只能前進,不能後退,就像ExecutorService的生命週期一樣,當某個任務完成後,它就永遠停留在完成狀態上。Future有如下方法:
cancel(boolean mayInterruptIfRunning) | isCancelled() | isDone() | get() | get(long timeout, TimeUnit unit) |
---|---|---|---|---|
取消任務 | 判斷是否取消 | 當前任務是否執行完成 | 拿到返回的值 如果任務已經完成則立即返回一個值或者拋異常 如果任務未完成,則將阻塞直任務完成 |
同get(),但加了時間限制,如果在指定時間內未拿到值則拋異常 |
CompletionService:Executor與BlockingQueue
如果向Executor提交一組任務,並且希望在計算完成後獲得結果,那麼可以保留與每個任務關聯的Future,然後反覆使用get方法,同時將引數timeout指定為0,從而通過輪詢來判斷任務是否完成。這種方法雖然可行,但卻有些繁瑣。幸運的是,還有一種更好的方法:完成服務(CompletionService)
CompletionService將Executor和BlockingQueue的功能融合在一起。你可以將Callable任務提交給它來執行,然後使用類似佇列的操作take和poll等方法來獲得已完成的結果,而這些結果會在完成時將被封裝為Future。ExecutorCompletionService實現了CompletionService,並將計算結果委託給一個Executor。