Java併發程式設計系列之十五 Executor框架
Java使用執行緒完成非同步任務是很普遍的事,而執行緒的建立與銷燬需要一定的開銷,如果每個任務都需要建立一個執行緒將會消耗大量的計算資源,JDK 5之後把工作單元和執行機制區分開了,工作單元包括Runnable和Callable,而執行機制則由Executor框架提供。Executor框架為執行緒的啟動、執行和關閉提供了便利,底層使用執行緒池實現。使用Executor框架管理執行緒的好處在於簡化管理、提高效率,還能避免this逃逸問題——是指不完整的物件被執行緒呼叫。
Executor框架使用了兩級排程模型進行執行緒的排程。在上層,Java多執行緒程式通常把應用分解為多個任務,然後使用使用者排程框架Executor將這些任務對映為固定數量的執行緒;在底層,作業系統核心將這些執行緒對映到硬體處理器上。
Executor框架包括執行緒池,Executor,Executors,ExecutorService,CompletionService,Future,C
allable 等。
主執行緒首先通過Runnable或者Callable介面建立任務物件。工具類Executors可以把一個Runnable物件封裝為Callable物件(通過呼叫Executors.callable(Runnable task)實現),然後可以把Runnable物件直接交給ExecutorService執行,ExecutorService通過呼叫ExecutorService.execute(Runnable command)完成任務的執行;或者把Runnable物件或Callable物件交給ExecutorService執行,ExecutorService通過呼叫ExecutorService.submit(Runnable task)或者ExecutorService.submit(Callable task)完成任務的提交。在使用ExecutorService的submit方法的時候會返回一個實現Future介面的物件(目前返回的是FutureTask物件)。由於FutureTask實現了Runnable,也可以直接建立FutureTask,然後交給ExecutorService執行。
ExecutorService 介面繼承自 Executor 介面,它提供了更豐富的實現多執行緒的方法。比如可以呼叫 ExecutorService 的 shutdown()方法來平滑地關閉 ExecutorService,呼叫該方法後,將導致 ExecutorService 停止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另一類是還沒有開始執行的),當所有已經提交的任務執行完畢後將會關閉 ExecutorService。
通過Executors工具類可以建立不同的執行緒池ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。
FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads)public static ExecutorService newFixedThreadPool(int nThreads,ThreadFactory factory)
- 1
- 2
FixedThreadPool適用於為了滿足管理資源的需求,而需要限制當前執行緒數量的應用場景,它適用於負載比較重的伺服器。
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor()public static ExecutorService newSingleThreadExecutor(ThreadFactory factory)
- 1
- 2
SingleThreadExecutor適用於需要保證順序地執行各個任務,並且在任意時間點不會有多個執行緒在活動的場景。
CachedThreadPool
public static ExecutorService newCachedThreadPool()public static ExecutorService newCachedThreadPool(ThreadFactory factory)
- 1
- 2
CachedThreadPool是大小無界的執行緒池,適用於執行很多的短期非同步任務的小程式,或者負載比較輕的伺服器。
ScheduledThreadPoolExecutor
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,ThreadFactory factory)
- 1
- 2
建立一個支援定時及週期性的任務執行的執行緒池,多數情況下可用來替代Timer類。ScheduledThreadPoolExecutor適用於需要在多個後臺執行緒執行週期任務,同時為了滿足資源管理需求需要限制後臺執行緒數量的應用場景。
Executor框架的最核心的類是ThreadPoolExecutor,它是執行緒池的實現類,主要由四個元件構成。
- corePool:核心執行緒池的大小
- maximumPool:最大執行緒池的大小
- BlockingQueue:用來暫時儲存任務的工作佇列
- RejectedExecutionHandler:飽和策略。當ThreadPoolExecutor已經關閉或者ThreadPoolExecutor已經飽和時(是指達到了最大執行緒池的大小且工作佇列已滿),execute方法將要呼叫的Handler
使用Executor框架執行Runnable任務
package com.rhwayfun.concurrency;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * Created by rhwayfun on 16-4-4. */public class ExecutorRunnableTest { static class Runner implements Runnable{ public void run() { System.out.println(Thread.currentThread().getName() + " is called"); } } public static void main(String[] args){ ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++){ cachedThreadPool.execute(new Runner()); } cachedThreadPool.shutdown(); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
結果如下:
通過下面對CachedThreadPool的分析就能知道執行任務的時候首先會從執行緒池選擇空閒的執行緒執行任務,如果沒有沒有空閒的執行緒就會建立一個新的執行緒執行任務。這裡出現同一個執行緒執行兩遍的原因在於第一次執行任務的空閒執行緒執行完任務後不會馬上終止,認識等待60秒才會終止。
使用Executor框架執行Callable任務
Runnable 任務沒有返回值而 Callable 任務有返回值。並且 Callable 的call()方法只能通過 ExecutorService 的 submit(Callable task) 方法來執行,並且返回一個 Future(目前是FutureTask),是表示任務等待完成的 Future。如果需要得到Callable執行返回的結果,可以通過吊桶FutureTask的get方法得到。
下面的程式碼演示使用Executor框架執行Callable任務:
package com.rhwayfun.concurrency;import java.util.ArrayList;import java.util.List;import java.util.concurrent.*;/** * Created by rhwayfun on 16-4-4. */public class ExecutorCallableTest { /** * Callable任務 */ static class Runner implements Callable<String> { private String runId; public Runner(String runId) { this.runId = runId; } public String call() throws Exception { System.out.println(Thread.currentThread().getName() + " call method is invoked!"); return Thread.currentThread().getName() + " call method and id is " + runId; } } public static void main(String[] args) { //執行緒池 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); //接收Callable任務的返回結果 List<Future<String>> futureTaskList = new ArrayList<Future<String>>(); for (int i = 0; i < 5; i++) { Future<String> future = cachedThreadPool.submit(new Runner(String.valueOf(i))); futureTaskList.add(future); } //遍歷執行緒執行的返回結果 for (Future f : futureTaskList) { try { //如果任務沒有完成則忙等待 while (!f.isDone()) {} System.out.println(f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } finally { //關閉執行緒池,不再接收新的任務 cachedThreadPool.shutdown(); } } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
程式的執行結果如下:
submit 方法也是首先選擇空閒執行緒來執行任務,如果沒有,才會建立新的執行緒來執行任務。如果 Future 的返回尚未完成則 get()方法會阻塞等待直到 Future 完成返回。
FixedThreadPool詳解
建立FixedThreadPool的原始碼如下:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
- 1
- 2
- 3
- 4
- 5
其corePoolSize和maximumPoolSize都被設為nThreads的值。當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime為多餘的空閒執行緒等待新任務的最長時間,超過這個時間後多餘的執行緒將被終止。具體在FixedThreadPool的執行過程如下:
- 如果當前執行的執行緒數少於corePoolSize,就建立新的執行緒執行任務
- 線上程池如果當前執行的執行緒數等於corePoolSize時,將任務加入到LinkedBlockingQueue等待執行
- 執行緒執行完1中的任務後,會在迴圈中反覆從LinkedBlockingQueue獲取任務來執行
由於LinkedBlockingQueue使用的無界佇列,所以執行緒池中執行緒數不會超過corePoolSize,因此不斷加入執行緒池中的任務將被執行,因為不會馬上被執行的任務都加入到LinkedBlockingQueue等待了。
CachedThreadPool詳解
CachedThreadPool是一個根據需要建立執行緒的執行緒池。建立一個CachedThreadPool的原始碼如下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
- 1
- 2
- 3
- 4
- 5
由原始碼可以看出,CachedThreadPool的corePoolSize為0,maximumPoolSize為Integer.MAX_VALUE,keepAliveTime為60L,意味著多餘的空閒執行緒等待新任務的執行時間為60秒。
CachedThreadPool使用沒有容量的SynchronousQueue作為執行緒池的工作佇列(SynchronousQueue是一個沒有容量的阻塞佇列,每個插入操作必須等待另一個執行緒的對應移除操作),但是CachedThreadPool的maximumPool是無界的。這就意味著如果執行緒的提交速度高於執行緒的處理速度,CachedThreadPool會不斷建立執行緒,極端情況是因為建立執行緒過多耗盡CPU和記憶體資源。
CachedThreadPool的執行過程如下:
- 首先執行SynchronousQueue的offer方法。如果maximumPool有空閒執行緒正在執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那麼主執行緒執行offer操作與空閒執行緒的poll操作配對成功,主執行緒把任務交給空閒執行緒執行,否則執行2
- 如果maximumPool為空或者maximumPool沒有空閒執行緒時,CachedThreadPool將會建立一個新執行緒執行任務
- 在步驟2新建立的執行緒將任務執行完後,將會在SynchronousQueue佇列中等待60秒,如果60秒內主執行緒提交了新任務,那麼將繼續執行主執行緒提交的新任務,否則會終止該空閒執行緒。
ScheduledThreadPoolExecutor詳解
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,主要用來在給定的延遲之後執行任務,或者定期執行任務。Timer類也具有類似的功能,Timer對應的單個的後臺執行緒,而ScheduledThreadPoolExecutor可以在建構函式內指定多個對應的後臺執行緒。
ScheduledThreadPoolExecutor為了支援週期性任務的執行,使用了DelayQueue作為任務佇列。ScheduledThreadPoolExecutor會把待排程的任務(該任務是ScheduledFutureTask)放到DelayQueue中,執行緒池中的執行緒從DelayQueue中獲取要執行的定時任務並執行。
ScheduledFutureTask包含了3個變數:
- long型變數time,是任務具體的執行時間
- long型變數sequenceNumber,是這個任務被新增到ScheduledThreadPoolExecutor中的序號
- long型成員period,表示任務執行的間隔週期
下面是ScheduledThreadPoolExecutor具體的執行步驟:
- 執行緒從DelayQueue中獲取已經到期的ScheduledFutureTask。到期任務是指time大於等於當前時間的任務
- 執行緒執行這個過期任務
- 執行緒修改這個任務的time變數為下次執行的時間(當前時間加上間隔時間)
- 執行緒把修改後的任務放回DelayQueue,過期後會被重新執行