Java併發程式設計 - Executor,Executors,ExecutorService, CompletionServie,Future,Callable
一、Exectuor框架簡介
Java從1.5版本開始,為簡化多執行緒併發程式設計,引入全新的併發程式設計包:java.util.concurrent及其併發程式設計框架(Executor框架)。 Executor框架是指java 5中引入的一系列併發庫中與executor相關的一些功能類,其中包括執行緒池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。他們的關係為
在Executor框架中,使用執行器(Exectuor)來管理Thread物件,從而簡化了併發程式設計。
二、認識Exectuor(執行器)
1、併發程式設計的一種程式設計方式是把任務拆分為一系列的小任務,即Runnable,然後將這些任務提交給一個Executor執行,Executor.execute(Runnalbe) 。Executor在執行時使用其內部的執行緒池來完成操作。
Executor的子介面有:ExecutorService,ScheduledExecutorService,已知實現類:AbstractExecutorService,ScheduledThreadPoolExecutor,ThreadPoolExecutor。
2、Executor屬於public型別的介面。可以用於提交,管理或者執行Runnable任務。實現Executor介面的class還可以控制Runnable任務執行執行緒的具體細節。包括執行緒使用的細節、排程等。一般來說,Runnable任務開闢在新執行緒中的使用方法為:new Thread(new RunnableTask())).start()
3、但在Executor中,可以使用Executor而不用顯示地建立執行緒。例如,可以使用以下方法建立執行緒,而不是像第2點中為一種任務中的每個任務都呼叫new Thread(...)的方法。
Java程式碼
- Exectuor executor = anExecutor();
- executor.execute(new RunnableTask()); // 非同步執行
- executor.execute(new RunnableTask());
4、但是,Executor介面並沒有嚴格地要求執行必須是非同步/同步的,一切都相當自由。在最簡單的情況下,執行程式可以在呼叫者的執行緒中立即執行已提交的任務,
Java程式碼
- class DirectExecutor implements Executor {
- public void execute(Runnable r) {
- r.run();
- }
- }
更常見的是,任務在某個不是呼叫者執行緒的執行緒中執行的。如在另一個執行緒中啟動:
Java程式碼
- class ThreadPerTaskExecutor implements Executor {
- public void execute(Runnable r) {
- new Thread(r).start();
- }
- }
也可以在實現中用另一個Executor來序列化執行過程:
Java程式碼
- class SerialExecutor implements Executor {
- final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
- final Executor executor;
- Runnable active;
- SerialExecutor(Executor executor) {
- this.executor = executor;
- }
- public synchronized void execute(final Runnable r) {
- tasks.offer(new Runnable() {
- public void run() {
- try {
- r.run();
- } finally {
- scheduleNext();
- }
- }
- });
- if (active == null) {
- scheduleNext();
- }
- }
- protected synchronized void scheduleNext() {
- if ((active = tasks.poll()) != null) {
- executor.execute(active);
- }
- }
- }
5、ThreadPoolExecutor類提供了一個可供可擴充套件的執行緒池實現。Executors類為Executor介面及其實現提供了便捷的工廠方法。
6、 Executor中的方法execute。void execute(Runnable command)表示在未來的某個時間執行給定的命令。該命令可能在新的執行緒、已經入池的執行緒或者正在呼叫的執行緒中執行。
三、Executors類: 主要用於提供執行緒池相關的操作
Executors類,提供了一系列工廠方法用於建立執行緒池,返回的執行緒池都實現了ExecutorService介面。
1、public static ExecutorService newFiexedThreadPool(int Threads) 建立固定數目執行緒的執行緒池。
2、public static ExecutorService newCachedThreadPool():建立一個可快取的執行緒池,呼叫execute 將重用以前構造的執行緒(如果執行緒可用)。如果沒有可用的執行緒,則建立一個新執行緒並新增到池中。終止並從快取中移除那些已有 60 秒鐘未被使用的執行緒。
3、public static ExecutorService newSingleThreadExecutor():建立一個單執行緒化的Executor。
4、public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
建立一個支援定時及週期性的任務執行的執行緒池,多數情況下可用來替代Timer類。
四、ExecutorService與生命週期
1、ExecutorService可以理解為程式設計師提供了一堆操作Executor的API
2、ExecutorService擴充套件了Executor並添加了一些生命週期管理的方法。一個Executor的生命週期有三種狀態
執行、關閉和終止。
Executor建立時處於執行狀態。當呼叫ExecutorService.shutdown()後,處於關閉狀態,isShutdown()方法返回true。這時,不應該再向Executor中新增任務,所有已新增的任務執行完畢後,Executor處於終止狀態,isTerminated()返回true。如果Executor處於關閉狀態,往Executor提交任務會丟擲unchecked exception RejectedExecutionException。
3、本質
介面ExecutorService 表述了非同步執行的機制,並且可以讓任務在後臺執行。一個ExecutorService 例項因此特別像一個執行緒池。事實上,在 java.util.concurrent 包中的 ExecutorService 的實現就是一個執行緒池的實現。
Java程式碼
- ExecutorService executorService = Executors.newFixedThreadPool(10);
- executorService.execute(new Runnable() {
- public void run() {
- System.out.println("Asynchronous task");
- }
- });
- executorService.shutdown();
該示例程式碼首先使用 newFixedThreadPool() 工廠方法建立一個ExecutorService ,上述程式碼建立了一個可以容納10個執行緒任務的執行緒池。其次,向 execute() 方法中傳遞一個非同步的 Runnable 介面的實現,這樣做會讓 ExecutorService 中的某個執行緒執行這個Runnable 執行緒。
4、任務的委託
下方展示了一個執行緒的把任務委託非同步執行的ExecutorService的示意圖。
一旦執行緒把任務委託給 ExecutorService,該執行緒就會繼續執行與執行任務無關的其它任務。
5、ExecutorService 的實現
由於 ExecutorService 只是一個介面,ExecutorService 介面在 java.util.concurrent 包中有如下實現類:
6、ExecutorService 使用方法
這裡有幾種不同的方式讓你將任務委託給一個ExecutorService:
Java程式碼
- execute(Runnable)
- submit(Runnable)
- submit(Callable)
- invokeAny()
- invokeAll()
7、execute(Runnable)
方法 execute(Runnable) 接收一個java.lang.Runnable 物件作為引數,並且以非同步的方式執行它。如下是一個使用 ExecutorService 執行 Runnable 的例子:
Java程式碼
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- executorService.execute(new Runnable() {
- public void run() {
- System.out.println("Asynchronous task");
- }
- });
- executorService.shutdown();
使用這種方式沒有辦法獲取執行 Runnable 之後的結果,如果你希望獲取執行之後的返回值,就必須使用接收 Callable 引數的 execute() 方法。接下來會提到。
8、submit(Runnable)
方法 submit(Runnable) 同樣接收一個Runnable 的實現作為引數,但是會返回一個Future 物件。這個Future 物件可以用於判斷 Runnable 是否結束執行。如下是一個ExecutorService 的 submit() 方法的例子:
Java程式碼
- Future future = executorService.submit(new Runnable() {
- public void run() {
- System.out.println("Asynchronous task");
- }
- });
- //如果任務結束執行則返回 null
- System.out.println("future.get()=" + future.get());
9、submit(Callable)
方法 submit(Callable) 和方法 submit(Runnable) 比較類似,但是區別則在於它們接收不同的引數型別。Callable 的例項與 Runnable 的例項很類似,但是 Callable 的 call() 方法可以返回一個結果。方法 Runnable.run() 則不能返回結果。
Callable 的返回值可以從方法 submit(Callable) 返回的 Future 物件中獲取。如下是一個 ExecutorService Callable 的樣例:
Java程式碼
- Future future = executorService.submit(new Callable(){
- public Object call() throws Exception {
- System.out.println("Asynchronous Callable");
- return "Callable Result";
- }
- });
- System.out.println("future.get() = " + future.get());
上述樣例程式碼會輸出如下結果:
Java程式碼
- Asynchronous Callable
- future.get() = Callable Result
10、inVokeAny()
方法 invokeAny() 接收一個包含 Callable 物件的集合作為引數。呼叫該方法不會返回 Future 物件,而是返回集合中某一個Callable 物件的結果,而且無法保證呼叫之後返回的結果是哪一個 Callable,只知道它是這些 Callable 中一個執行結束的 Callable 物件。如果一個任務執行完畢或者丟擲異常,方法會取消其它的 Callable 的執行。
以下是一個樣例:
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- Set<Callable<String>> callables = new HashSet<Callable<String>>();
- callables.add(new Callable<String>() {
- public String call() throws Exception {
- return "Task 1";
- }
- });
- callables.add(new Callable<String>() {
- public String call() throws Exception {
- return "Task 2";
- }
- });
- callables.add(new Callable<String>() {
- public String call() throws Exception {
- return "Task 3";
- }
- });
- String result = executorService.invokeAny(callables);
- System.out.println("result = " + result);
- executorService.shutdown();
以上樣例程式碼會打印出在給定的集合中的某一個Callable 的返回結果。嘗試執行後發現每次結果都在改變。有時候返回結果是"Task 1",有時候是"Task 2",等等。
11、invokeAll()
方法 invokeAll() 會呼叫存在於引數集合中的所有 Callable 物件,並且返回一個包含 Future 物件的集合,你可以通過這個返回的集合來管理每個 Callable 的執行結果。需要注意的是,任務有可能因為異常而導致執行結束,所以它可能並不是真的成功運行了。但是我們沒有辦法通過 Future 物件來了解到這個差異。
12、ExecutorService服務的關閉
當使用 ExecutorService 完畢之後,我們應該關閉它,這樣才能保證執行緒不會繼續保持執行狀態。
舉例來說,如果你的程式通過 main() 方法啟動,並且主執行緒退出了你的程式,如果還有一個活動的 ExecutorService 存在於程式中,那麼程式將會繼續保持執行狀態。存在於 ExecutorService 中的活動執行緒會阻止Java虛擬機器關閉。
為了關閉在 ExecutorService 中的執行緒,需要呼叫 shutdown() 方法。但ExecutorService 並不會馬上關閉,而是不再接收新的任務,一旦所有的執行緒結束執行當前任務,ExecutorServie 才會真的關閉。所有在呼叫 shutdown() 方法之前提交到 ExecutorService 的任務都會執行。
如果你希望立即關閉 ExecutorService,你可以呼叫 shutdownNow() 方法。這個方法會嘗試馬上關閉所有正在執行的任務,並且跳過所有已經提交但是還沒有執行的任務。但是對於正在執行的任務,是否能夠成功關閉它是無法保證的,有可能他們真的被關閉掉了,也有可能它會一直執行到任務結束。這是一個最好的嘗試。
五、CompletionService
根據上面的介紹我們知道,現在在Java中使用多執行緒通常不會再使用Thread物件了。而是會用到java.util.concurrent包下的ExecutorService來初始化一個執行緒池供我們使用。使用ExecutorService類的時候,我們常維護一個list儲存submit的callable task所返回的Future物件。然後在主執行緒中遍歷這個list並呼叫Future的get()方法取到Task的返回值。
其實除了使用ExecutorService外,還可通過CompletionService包裝ExecutorService,然後呼叫其take()方法去取Future物件。
CompletionService和ExecutorService的主要的區別在於submit的task不一定是按照加入自己維護的list順序完成的。
ExecutorService中從list中遍歷的每個Future物件並不一定處於完成狀態,這時呼叫get()方法就會被阻塞住,如果系統是設計成每個執行緒完成後就能根據其結果繼續做後面的事,這樣對於處於list後面的但是先完成的執行緒就會增加了額外的等待時間。
而CompletionService的實現是維護一個儲存Future物件的BlockingQueue。只有當這個Future物件狀態是結束的時候,才會加入到這個Queue中,take()方法其實就是Producer-Consumer中的Consumer。它會從Queue中取出Future物件,如果Queue是空的,就會阻塞在那裡,直到有完成的Future物件加入到Queue中。所以,先完成的必定先被取出。這樣就減少了不必要的等待時間。
六、使用Callable,Future返回結果
Future<V>代表一個非同步執行的操作,通過get()方法可以獲得操作的結果,如果非同步操作還沒有完成,則,get()會使當前執行緒阻塞。FutureTask<V>實現了Future<V>和Runable<V>。Callable代表一個有返回值的操作。
- Callable<Integer> func = new Callable<Integer>(){
- public Integer call() throws Exception {
- System.out.println("inside callable");
- Thread.sleep(1000);
- return new Integer(8);
- }
- };
- FutureTask<Integer> futureTask = new FutureTask<Integer>(func);
- Thread newThread = new Thread(futureTask);
- newThread.start();
- try {
- System.out.println("blocking here");
- Integer result = futureTask.get();
- System.out.println(result);
- } catch (InterruptedException ignored) {
- } catch (ExecutionException ignored) {
- }
ExecutoreService提供了submit()方法,傳遞一個Callable,或Runnable,返回Future。如果Executor後臺執行緒池還沒有完成Callable的計算,則呼叫返回Future物件的get()方法,會阻塞直到計算完成。
Java5以後可以利用Future來跟蹤非同步計算的結果。在此之前主執行緒要想獲得工作執行緒(非同步計算執行緒)的結果是比較麻煩的事情,需要我們進行特殊的程式結構設計,比較繁瑣而且容易出錯。有了Future我們就可以設計出比較優雅的非同步計算程式結構模型:根據分而治之的思想,我們可以把非同步計算的執行緒按照職責分為3類:
1. 非同步計算的發起執行緒(控制執行緒):負責非同步計算任務的分解和發起,把分解好的任務交給非同步計算的work執行緒去執行,發起非同步計算後,發起執行緒可以獲得Futrue的集合,從而可以跟蹤非同步計算結果。
2. 非同步計算work執行緒:負責具體的計算任務
3. 非同步計算結果收集執行緒:從發起執行緒那裡獲得Future的集合,並負責監控Future的狀態,根據Future的狀態來處理非同步計算的結果。