Java併發包:ExecutorService和ThreadPoolExecutor
ExecutorService
Java.util.concurrent.ExecutorService介面代表一種非同步執行機制,它能夠在後臺執行任務。因此ExecutorService與thread pool是非常相似的。事實上,在java.util.package包中ExecutorService的具體實現就是一個執行緒池的具體實現。
ExcutorService 例子
下面是一個簡單的例子
ExecutorService executorService = Executors.newFixedThreadPool(10); executorService.execute(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); executorService.shutdown();
首先,通過newFixedThreadPool()工廠方法建立一個ExecutorService的例項。這個方法建立了一個可以有10個執行緒執行任務的執行緒池。
第二,Runnable介面的匿名實現類作為引數被傳遞給execute()方法。Runable將會被ExecutorService中的一個執行緒來執行。
任務委託(Task Delegation)
下面的圖片說明了一個執行緒委託一個任務給ExecutorService進行非同步執行:
一旦,執行緒委託任務給ExecutorService,執行緒會獨立任務的執行而繼續自己之後的操作。
ExcutorService的使用說明
下面是委託任務給ExecutorService的一些不同的方式:
- execute(Runnable)
- submit(Runnable)
- submit(Callable)
- invokeAny(…)
- invokeAll(…)
下面來逐個看看這些方法。
- execute(Runnable)
execute(Runnable) 方法接受一個java.lang.Runable物件的例項,並非同步執行之。下面是一個使用ExecutorService執行Runnable的例子:
ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); executorService.shutdown();
這種方式不能獲得Runnable執行的結果,如果有這種需要,你將要使用Callable。
- submit(Runnable)
submit(Runnable) 方法也接收一個Runnable介面的具體實現,並返回一個Future物件。Future物件可以用來檢測Runable是否執行完成。
Future future = executorService.submit(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
future.get(); //returns null if the task has finished correctly.
- submit(Callable)
submit(Callable)方法與submit(Runnable)方法相似,除了接收的引數有所不同。Callable例項非常類似於Runnable,不同的是call方法可以返回一個結果,Runnable.run()方法不能返回一個結果。
可以通過submit(Callable)方法返回的Future物件獲取Callable的結果。下面是一個使用Callable的例子:
Future future = executorService.submit(new Callable(){
public Object call() throws Exception {
System.out.println("Asynchronous Callable");
return "Callable Result";
}
});
System.out.println("future.get() = " + future.get());
上面程式碼的輸出結果是:
Asynchronous Callable
future.get() = Callable Result
- invokeAny(…)
invokeAny()方法接收一個Callable物件或者Callable的子介面例項的集合作為引數,這個方法不會返回Future,但會返回集合中某一個Callable的結果。你不能確定你得到是哪個Callable的結果。只是已執行完成的Callable中的一個。
如果一個任務已經完成(或者丟擲了異常),剩餘的Callable任務將被取消。
下面是示例程式碼:
ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> callables = new HashSet<Callable<String>>();
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});
String result = executorService.invokeAny(callables);
System.out.println("result = " + result);
executorService.shutdown();
示例程式碼將會列印給定的Callable集合中一個Callable任務返回的結果。我嘗試執行了多次,結果是變化的。有時候是“Task1”,有時候是“Task 2”等。
- invokeAll(…)
invokeAll()接收一個Callable物件的集合作為引數,該方法會呼叫你傳給他的集合中的所有Callable物件。invokeAll()會返回一個Future物件的列表,通過這個列表你可以獲取每一個Callable執行的結果。
記住一個任務可能會因為一個異常而結束,因此這時任務並不是真正意義上執行成功了。這在Future上是沒有辦法來判斷的。
下面是示例程式碼:
ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> callables = new HashSet<Callable<String>>();
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});
List<Future<String>> futures = executorService.invokeAll(callables);
for(Future<String> future : futures){
System.out.println("future.get = " + future.get());
}
executorService.shutdown();
ExecutorService Shutdown
當你是使用完ExecutorService後,你應該關閉它,使得執行緒不能持續執行。例如,你的應用程式從main()方法開始並且你的主執行緒退出應用程式,這時如果存在啟用狀態的ExecutorService,你的應用程式將仍然會保持執行。ExecutorService中啟用的執行緒會阻止JVM關閉。
為了終止ExecutorService中的執行緒,你需要呼叫shutdown()方法。ExecutorService不會立即關閉,但是它也不會接受新的任務,直到它裡面的所有執行緒都執行完畢,ExecutorService才會關閉。所有提交到ExecutorService中的任務會在呼叫shutdown()方法之前被執行。
如果你想立即關閉ExecutorService,你可以呼叫shutdownNow()方法。這將會嘗試立即停止所有正在執行的任務,並且忽略所有提交的但未被處理的任務。對於正在執行的任務是不能確定的,也許它們停止了,也行它們執行直到結束。
ThreadPoolExecutor
Java.util.concurrent.ThreadPoolExecutor類是ExecutorSerivce介面的具體實現。ThreadPoolExecutor使用執行緒池中的一個執行緒來執行給定的任務(Runnable或者Runnable)。
ThreadPoolExecutor內部的執行緒池包含不定數量的執行緒。池中執行緒的數量由下面的這些變數決定:
- corePoolSize
- maximumPoolSize
當一個任務委託給執行緒池執行,此時如果池執行緒中執行緒數少於corePoolSize,即使池中有空閒的執行緒,執行緒池中也會建立一個新的執行緒。
如果任務佇列是滿的,corePoolSize個執行緒或者更多的且少於maximumPoolSize的執行緒正在執行,也會建立一個新的執行緒來執行任務。
下面圖釋ThreadPoolExecutor這種原理:
建立ThreadPoolExecutor
ThreadPoolExecutor有多種建構函式。例如:
int corePoolSize = 5;
int maxPoolSize = 10;
long keepAliveTime = 5000;
ExecutorService threadPoolExecutor =
new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
);
除非你需要顯示的給ThreadPoolExecutor指定這些引數,通常使用java.util.concurrent.Executor類中的工廠方法來建立例項。