5.2探究執行器(Executors)
你可以通過java提供的執行緒API來執行任務,像建立一個java.lang.Thread(newRunnableTask()).start();這個應用在可以執行多工的機器表現更為明顯(運行當前執行緒,建立執行緒和線上程池中隨意選擇執行執行緒)。
Note 一個任務就是一個物件,它會繼承java.lang.Runnable的介面(執行的任務)或者繼承java.util.concurrent.Callable的介面(立刻執行的任務)。
併發工具包括高水平的執行器(executors),取代低水平的執行緒任務執行應用的工具。一個執行物件如果直接或間接繼承java.util.concurrent.Executor介面,那麼它就會從任務執行的器中解耦。
Note 這個執行器框架的運用介面,使其從任務執行中解耦任務,就像是集合框架運用介面從list、sets、queues和maps的介面中解耦是一樣的。解耦的好處是程式碼靈活的容易維護。
使用Excutor時,我們需要宣告它的唯一方法execute(Runnable runnable),那麼就會執行runnable命名的任務。如果runnable為nul將會報錯:java.lang.NullPointExcetion,如果不能執行runnable的任務,那麼會報錯:java.util.concurrent.RejectedExecutionException。
Note 如果一個執行緒被強迫停止或不想接收新有任務時,將會丟擲RejectedExecutionException.當然,如果執行器沒有足夠的空間去儲存任務時,也會報出這個錯誤。(可能執行器使用一個有界的阻塞佇列去儲存任務和為個佇列滿了。)
下面例子使用Executor,與前面講的new Thread(new RunnableTask()).start()的方法是一樣的。如:
Executor executor = ···;//···代表著相同的執行器,建立executor.execute(new //RunnableTask());
儘管Executor是很容易被使用的,但是這個介面存如下的限制:
Executor的焦點是在Runnable上。因為Runnable的run()方法不能夠返回任何值,這裡也沒有任務方式讓一個執行的任務返回值給它的請求者。
Executor沒有提供任何的方法,讓一個正在執行任務的執行緒取消或終止。
Executor不能夠執行一個集合的執行任務。
Executor不能夠讓一個應用強迫停止應用。
以上的這些限制可以使用java.util.concurrent.ExcutorService的介面來解決,這個介面繼承Executor和它的應用是一個執行緒池。
表5-1 ExecutorService的方法
方法 |
說明 |
boolean awaitTermination(long timeout, TimeUnit unit) |
阻塞直到所有任務的執行緒都完成。如果執行緒超時或當前執行緒強迫停止,那麼執行緒將會停止請求。當executor正常結束,那麼將會返回true,否則線上程執行期間超時,將返回false。當執行緒被打斷,那麼將會報錯:java.lang.InterruptedException. |
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) |
執行每一個即時響應的任務集合時,將會返回java.util.concurrent.Future的java.util.List。List的集合中包含著任務正常結束狀態或由於錯誤而報出異常的狀態。Futures的List是在相同的有序佇列中的,這些任務佇列是通過任務的迭代器(iterator)返回的。如果執行緒在等待時被打斷或任務任務沒有完成就被取消,那麼這個方法將會丟擲InterruptedException的錯誤。如果tasks為空或任務一個元素為空,那麼將會丟擲NullPointException。如果一個任務不能被排序來執行,那麼將會丟擲RejectedExecutionException。 |
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) |
執行每一個即時響應的任務集合時,將會返回java.util.concurrent.Future的java.util.List。List的集合中包含著任務正常結束狀態或由於錯誤而報出異常的狀態,或超時終止。任務如果取消了,那麼將會終止。Futures的List是在相同的有序佇列中的,這些任務佇列是通過任務的迭代器(iterator)返回的。如果執行緒在等待時被打斷或任務任務沒有完成就被取消,那麼這個方法將會丟擲InterruptedException的錯誤。如果tasks為空或任務一個元素為空,或unit為空,那麼將會丟擲NullPointException。如果一個任務不能被排序來執行,那麼將會丟擲RejectedExecutionException。 |
<T> T invokeAny(Collection<? extends Callable<T>> tasks) |
執行執行緒所給的任務,返回任何任務的成功執行。也就是說執行的任務不能報錯。如果執行的任務不正常或報錯,那麼任務(tasks)將不成功或不能正常結束。如果執行緒在等待時候被打斷,那麼將會丟擲InterruptedException;如果tasks或任何元素為空,那麼將會丟擲NullPointerException;當tasks中沒有元素,那麼將會丟擲java.lang.IllegalArgumentException.當沒有任務成功執行,將丟擲ExecutionException; 如果一個任務不能被排序來執行,那麼將會丟擲RejectedExecutionException。 |
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) |
執行執行緒所給的任務,返回任何任務的成功執行。也就是說執行的任務不能報錯。如果執行的任務不正常或報錯或超時,那麼任務(tasks)將不成功或不能正常結束。如果執行緒在等待時候被打斷,那麼將會丟擲InterruptedException;如果tasks或任何元素為null或unit為null,那麼將會丟擲NullPointerException;當tasks中沒有元素,那麼將會丟擲java.lang.IllegalArgumentException.當沒有任務成功執行,將丟擲ExecutionException; 如果一個任務不能被排序來執行,那麼將會丟擲RejectedExecutionException。當任務成功執行之前卻超時了,那麼將會丟擲TimeoutException。 |
boolean isShutdown() |
當執行器被關閉,那麼將返回true,其它情況將返回false. |
boolean isTerminated() |
當所有任務都執行成功並且關閉,返回true;其它情況返回false。如果執行shutdown()或shutdownNow()方法,那麼這個方法將不會返回true. |
void shutdown() |
在所有的任務都提交之後,就開始的序的關閉任務,這期間將不會接受任何新有的任務。如果執行器(excutors)被關閉了,那麼執行這個方法是沒有任何作用的。如果之間的任務已經成功執行並提交了,那麼這個方法將不會等待,而是立即執行。如果需要等待,那麼就使用awaitTermination()的方法。 |
List<Runnable> shutdownNow() |
嘗試去停止正在執行的任務,或停止正在等待的任務的執行緒,那麼將會返回的是等待執行任務的集合。這裡不能夠保證可以順利停止每一個任務,比如,應用被Thread.interrupt()取消了,那麼任務請求去停止,是沒有作用的。 |
<T> Future<T> submit(Callable<T> task) |
提交一個即時響應的任務,將會返回Future的例項,代表著等待任務的處理結果返回。Future的例項中的get()方法,返回任務的成功執行情況。當任務(task)不能有序執行時,將會丟擲RejectedExecutionException.如果你想更快速的阻塞一個等待的任務時,那麼你可以用這個方法result = exec.submit(aCallable).get() |
Future<?> submit(Runnable task) |
提交一個執行的任務去執行,直到有結果返回。Future 的例項中的get()方法返回任務成功執行與否。如果這個任務不能有序的執行,那麼將會丟擲RejectedExecutionException;如果引數task為null,那麼將會丟擲NullPointerException. |
<T> Future<T> submit(Runnable task, T result) |
提交一個執行的任務去執行,直到有結果返回。Future 的例項中的get()方法返回任務成功執行與否,通過result來表現。如果這個任務不能有序的執行,那麼將會丟擲RejectedExecutionException;如果引數task為null,那麼將會丟擲NullPointerException. |
表格5-1涉及到java.util.concurrent.TimeUnit,這個工具類中包含執行時間期間的粒度,其中包含的列舉有:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS、和SECONDS。與此同時,TimeUnit宣告時間轉換的工具(如:long toHours(long duration)),和執行和延遲的操作(如:voidsleep(long timeout))。
表格5-1涉及到即時響應的任務(callabletasks)。它不像Runnable中的run()方法,不能夠返回值和不能夠丟擲異常。Callable<V> 的 V call()方法將會返回值和丟擲異常的,因為它聲明瞭throws Exception。
最後,表格5-1涉及到了Future的介面,這個代表著同步計算的結果。它返回的結果是當作一個future,通常它是無效的,除非的相同的時刻的future.Future的屬性是Future<V>,提供方法去取消一個任務,因為它可以返回值和確保任務是否完成。表格5-2描述了Future的方法。
表格5-2 Future的方法
方法 |
說明 |
boolean cancel(boolean mayInterrupIfRunning) |
嘗試取消執行的任務,和取消一個任務返回true;其它情況將返回false(這個任務在cancel()之前不能夠正常取消)。當一個任務已經完成、取消或其它原因不能被取消,那麼呼叫這個方法將會失敗。如果已經成功或任務尚未開始,那麼這個任務將不會執行。如果任務已經開始,那麼可以通過mayInterruptIfRunning宣告true或false,設定是否將任務停止或打斷。在執行之後,呼叫isDone()方法返回true,當呼叫cancel()方法返回true,那麼isCancelled()總是返回true。 |
V get() |
如果需要任務成功執行然後返回結果,那麼可以呼叫此方法。當這個方法在取消之前,任務先取消了,那麼將會丟擲CancellaException.當任務的異常時,就會丟擲ExecutionException.如果當前的執行緒在等待期間被打斷,那麼將會丟擲InterruptedException |
V get(long timeout, TimeUnit nuit) |
等待timeout的時間,等待任務成功執行或有需要的話返回結果。當這個方法在取消之前,任務先取消了,那麼將會丟擲CancellaException.當任務的異常時,就會丟擲ExecutionException.如果當前的執行緒在等待期間被打斷,那麼將會丟擲InterruptedException如果方法超時了,那麼將要丟擲TimeoutException. |
boolean isCancelled() |
如果任務正常執行之前被取消了,那麼這個方法將會返回true,其它情況將會返回false。 |
boolean isDone() |
如果任務結束,那麼返回true,其它情況返回false。結束情況,可能是非正常結束,異常或取消。這些情況,方法將返回true. |
想像一下,你試圖寫這樣的一個應用,提供圖形表格讓使用者輸入單詞。使用者輸入單詞之後,就可以通過線上的詞典來查詢這個單詞,並將查詢的結果顯示給使用者。
因為線上的應用可能會因為網速而比較慢,而且使用者的互動需要請求和響應。你需要將請求的單詞(這個任務)放到執行器中執行,而且返回這個執行緒執行的結果。下面的例子將會使用ExecutorService、Callable和Future來實現這個功能。
ExecutorService executor = ...; // ... represents some executor creation
Future<String[]> taskFuture =
executor.submit(new Callable<String[]>()
{
@Override
public String[] call()
{
String[] entries = ...;
// Access online dictionaries
// with search word and populate
// entries with their resulting
// entries.
return entries;
}
});
// Do stuff.
String entries = taskFuture.get();
之後在相同的方法中包含一個執行器(executor),這個例子執行緒提交一個即時響應的任務給執行器(executor)。為了去控制任務的執行和獲取結果,submit()方法將會更快速地返回一個介面給Future物件。
Note java.util.concurrent.ScheduleExecutorService的介面繼承ExecutorService。和描述一個執行器讓你的任務列表執行,或執行緒被延遲了,那麼將會一直迴圈執行。
儘管你可以建立自己的Excutor、ExecutorService和ScheduledExcutorService的實現工具(例如,DirectExcutorimplements Excutor{@override public void execute(Runnable r){r.run();}} ----執行這個執行器(executor)將會呼叫執行緒)。這個簡單的例子也可以用java.concurrent.Executors來替換。
Note 如果你想要建立自己的ExcutorService的應用工具,那你java.util.concurrent.AbstractExecutorService和java.util.concurrent.FutureTask的類是非常有用的。
這個Executors的作用聲明瞭幾個類的方法可以返回多個的例項,如ExecutorService和ScheduledExecutorService的應用工具(或者其它的例項應用)。這個類的方法可以有如下的功效:
建立和返回一個ExecutorService的例項,這個構造一個普通的應用或構造設定。
建立和返回一個ScheduledExecutorSevice的例項,這個構造一個普通的應用或構造設定。
建立和返回一個“擦除(wrapped)”的ExcutorService 或ScheduledExecutorService的例項;這個對於具體應用方法是難以見效的。
建立和返回一個java.util.concurrent.ThreadFactory的例項(這個類是繼承ThreadFactory的介面),為了建立一個執行緒物件。
建立和返回一個Callable的例項而不是一個關閉的物件,以至於執行這個方法可以請求Callable的引數(例如,ExecutorService 的submit(Callable)方法)。
例如,staticExecutorService newFixedThreadPool(int nThreads)建立一個執行緒池,這個執行緒池會在沒的邊界的佇列中使用固定的引數。在大多數情況下,nThread執行緒是活躍的程序任務。當所有執行緒都在執行時,如果有新的執行緒提交任務,那麼將處於等待狀態。
如果一個執行器(executor)關閉,那麼當前執行緒在執行期間就會失敗,所以當前執行的執行緒也就會終止。這個的話,執行緒池就會讓出這個空間給其它需要的執行緒來執行。執行緒明確停止了,但是執行緒池還是存在的。這個時候就會丟擲IllegalArgumentException,因為nThreads為‘0’或無效的值。
Note 建立一個執行緒而且要經常提交任務,那麼執行緒池作用就是用來消除這樣的高開銷。執行緒的建立不是低開銷的,因為建立多個執行緒會影響到系統的效能。
你可能經常運用executors、runnable、callables和futures在檔案和網路的輸入/輸出的應用中。執行一個長的計算,你可能也會用到。下面的例子給出了運用executor、callable、future在尤拉公式(Euler)的計算。
package com.owen.thread.chapter5;
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CalculateE
{
final static int LASTITER = 17;
public static void main(String[] args)
{
ExecutorService executor = Executors.newFixedThreadPool(1);
Callable<BigDecimal> callable;
callable = new Callable<BigDecimal>()
{
@Override
public BigDecimal call()
{
MathContext mc = new MathContext(100, RoundingMode.HALF_UP);
BigDecimal result = BigDecimal.ZERO;
for (int i = 0; i <= LASTITER; i++)
{
BigDecimal factorial = factorial(new BigDecimal(i));
BigDecimal res = BigDecimal.ONE.divide(factorial, mc);
result = result.add(res);
}
return result;
}
public BigDecimal factorial(BigDecimal n)
{
if (n.equals(BigDecimal.ZERO))
return BigDecimal.ONE;
else
return n.multiply(factorial(n.subtract(BigDecimal.ONE)));
}
};
Future<BigDecimal> taskFuture = executor.submit(callable);
try
{
while (!taskFuture.isDone())
System.out.println("waiting");
System.out.println(taskFuture.get());
} catch (ExecutionException ee)
{
System.err.println("task threw an exception");
System.err.println(ee);
} catch (InterruptedException ie)
{
System.err.println("interrupted while waiting");
}
executor.shutdownNow();
}
}
預設的主執行緒執行main()方法,這個方法包含一個執行器(executor),通過Excutors的newFixedThreadPool()方法來呼叫。在例項化Callable的介面時,它包含了一個匿名內部類,和通過提交任務給執行器(executor),接收一個Future例項的請求。
提交任務之後,一個執行緒將會執行相同的工作直到它的請求等到任務的結果返回。 我模擬這樣的工作,是主執行緒會一直處於等待的狀態,直到Future的例項屬性isDone()方法返回true.(在實際的應用中,我們應該避免這個迴圈。)這裡最關鍵的是,主執行緒呼叫get()的方法去獲取結果,然後輸出。之後,主執行緒會停止執行器(executor)。
Caution 當一個應用執行完之後,停止這個執行器是非常關鍵的。否則這個應用可能不能終止。在上面的例子中呼叫shutdownNow()方法(你也可以用shutdown()方法)。
callable的call()方法計算的公式是e = 1/0! + 1/1! + 1/2!...這個一系列的計算可以寫為1/n!,n的範圍是從0到無窮大。
在call()的方法中,我們例項化java.math.MathContext,這個封裝了一個計算的精確度和舍入的方式。我選擇了100為最高的限制 e的精確度,和我選擇HALF_UP為舍入方式。
Tip 提高精確度,最終的結果將會起來起接近於e的值。
call()方法中例項化一個java.math.BigDecimal的區域性變數result的例項值是BigDecimal.ZERO。它將放入到一個計算階梯因子的迴圈中,通過階梯因子分隔BigDecimal.ONE,和新增分隔結果給result。
divide()方法將MathContext例項作為第二個引數去明確舍入的資訊。(如果我指定作為精確度,那麼將會出現nonterminating decimal expansion,也就是說商數分隔的結果是不正確的,如0.33333…。實際會丟擲java.lang.ArithmeticException。執行器(executor)還會再次丟擲ExecutionException)。
執行上面的程式碼,輸入的結果可能是:
waiting waiting waiting waiting waiting waiting waiting waiting 2.718281828459045070516047795848605061178979635251032698900735004065225042504843314055887974344245741730039454062711 |