Java併發——Callable和Future
- Callable定義的方法是call( ),而Runnable定義的方法是run( )。
- Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
- Callable的call方法可丟擲受檢查的異常,而Runnable的run方法不能丟擲異常。
在工具類Executors中有一些工具方法可以把Runnable任務轉成Callable。你可以使用executor去執行一個Callable任務,也可以將Callable轉成FutureTask物件,然後交由執行緒去執行。
Future是非同步計算的結果,它描述了任務的生命週期,並提供了相關的方法來獲得任務執行的結果、取消任務以及檢查任務是否已經完成或者取消。
有多種方式可以建立一個Future。ExecutorService中的所有submit方法都會返回一個Future,利用這個返回的Future你可以獲取任務的執行結果,或者取消任務。可以顯示將Runnable或者Callable例項化一個FutureTask。
下面的例子演示了Callable和Future的一些方法,程式中定義了兩個任務c1和c2,並且模擬c2的執行時間是8秒左右,然後依次呼叫future的相關方法
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CallableAndFuture {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(5);
Callable<Integer> c1 = new Target(false);
Callable<Integer> c2 = new Target(true);
Future<Integer> f1 = es.submit(c1);
Future<Integer> f2 = es.submit(c2);
int res = 0;
try {
res = f1.get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
boolean isCancelled = f1.isCancelled();
boolean isDone = f1.isDone();
System.out.println(res);
System.out.println(isCancelled);
System.out.println(isDone);
System.out.println("---------------------------");
try {
boolean cancel = f2.cancel(true);
int res2 = f2.get();
isCancelled = f1.isCancelled();
isDone = f1.isDone();
System.out.println(res2);
System.out.println(cancel);
System.out.println(isCancelled);
System.out.println(isDone);
} catch (CancellationException e) {
// TODO Auto-generated catch block
System.out.println("任務被取消.");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
System.out.println("任務被中斷.");
} catch (ExecutionException e) {
// TODO Auto-generated catch block
System.out.println("任務執行異常.");
}
}
}
class Target implements Callable<Integer> {
private boolean sleep = false;
public Target(boolean sleep) {
// TODO Auto-generated constructor stub
this.sleep = sleep;
}
@Override
public Integer call() throws Exception {
// TODO Auto-generated method stub
if(sleep) {
Thread.sleep(8000);
}
int i = new Random().nextInt(1000);
return i;
}
}
任務的執行結果:
982
false
true
---------------------------
任務被取消.
Future介面的相關方法
cancel( )方法可以試圖取消任務的執行,如果當前任務已經完成、或已經被取消、或由於某些原因無法取消,則取消操作失敗,返回false;如果該任務尚未執行,呼叫cancel( )方法將會使該任務永不會執行;如果呼叫cancel( )方法時,該任務已經執行,那麼取決於引數boolean的值,如果是true,則表示立即中斷該任務的執行,否則,等待該執行的任務結束後,嘗試cancel並返回false。
isCancel( ),如果在任務正常完成前將其取消,那麼返回true,否則,返回false。
isDone( ) , 如果任務已完成,則返回true,由於正常終止、異常或取消而完成,也會返回true。
get( ) , 如果任務已經完成,那麼get會立即返回或者丟擲一個Exception,如果任務沒有完成,get會阻塞直到它完成。如果任務丟擲了異常,get會將該異常封裝為ExecutionException,然後重新丟擲;如果任務被取消,get會丟擲CancellationException。
FutureTask
FutureTask類相當於同時實現了Runnable和Future介面,提供了Future介面的具體實現,可以使用FutureTask去包裝Callable或Runnable任務。因為FutureTask實現了Runnable介面,所以可以將其交給Executor去執行,或者直接呼叫run( )去執行。
使用FutureTask的一個示例
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
public class MyFutureTask {
public static void main(String[] args) throws Exception {
Executor executor = Executors.newFixedThreadPool(5);
Callable<Integer> callable = new MyTarget();
FutureTask<Integer> ft = new FutureTask<>(callable);
executor.execute(ft);
System.out.println(ft.get());
// 直接呼叫run
// ft.run();
// System.out.println(ft.get());
System.out.println("-----------------------");
Runnable runnable = new MyRunnableTarget();
FutureTask<String> ft2 = new FutureTask<String>(runnable, "SUCCESS");
executor.execute(ft2);
System.out.println(ft2.get());
}
}
class MyTarget implements Callable<Integer> {
@Override
public Integer call() throws Exception {
// TODO Auto-generated method stub
int i = new Random().nextInt(1000);
return i;
}
}
class MyRunnableTarget implements Runnable {
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("Runnable is invoke...");
}
}
程式輸出:
280
-----------------------
Runnable is invoke...
SUCCESS
CompletionService
有時候我們需要利用executor去執行一批任務,每個任務都有一個返回值,利用Future就可以解決這個問題,為此我們需要儲存每個任務提交後的Future,然後依次呼叫get方法輪詢,獲得已經執行完畢的任務的結果,這樣的過程顯得無趣。我們希望一次提交一批任務後,executor執行結束也是返回給我們一個已經執行完畢的Future集合。
CompletionService整合了Executor和BlockingQueue的功能。你可以將一批Callable任務交給它去執行,然後使用類似於佇列中的take和poll方法,在結果完成時獲得這個結果,就像一個打包的Future。將生產新的非同步任務與使用已完成任務的結果分離開來的服務。生產者submit方法執行的任務。使用者 take 已完成的任務,並按照完成這些任務的順序處理它們的結果。ExecutorCompletionService類是一個實現了CompletionService介面的實現類,它將計算任務交給一個傳入的Executor去執行。
下面是一個ExecutorCompletionService的示例
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
public class TestCompletionService {
private class Target implements Callable<Integer> {
@Override
public Integer call() throws Exception {
// TODO Auto-generated method stub
int i = new Random().nextInt(1000);
return i;
}
}
public static void main(String[] args) throws Exception {
Executor executor = Executors.newFixedThreadPool(5);
ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<>(executor);
Callable<Integer> c1 = new TestCompletionService().new Target();
Callable<Integer> c2 = new TestCompletionService().new Target();
Callable<Integer> c3 = new TestCompletionService().new Target();
ecs.submit(c1);
ecs.submit(c2);
ecs.submit(c3);
System.out.println(ecs.take().get());
System.out.println(ecs.poll().get());
System.out.println(ecs.take().get());
}
}
這樣將Future分離開來,已經完成的任務的Future就會被加入到BlockingQueue中供使用者直接獲取。
關於poll方法和get方法的區別,poll方法是非阻塞的,有則返回,無則返回NULL,take方法是阻塞的,沒有的話則會等待。
批處理與任務執行時限
在有些應用場景中,我們需要同時處理多個任務,並獲取結果,使用上面的CompletionService將完成的任務與未完成的任務分隔開似乎能夠解決,但是如果其中有一個任務相當耗時,就會影響整個批處理任務的完成速度。比如,在一個頁面中,我們需要從多個數據源獲取資料,並在頁面展示,同時我們希望整個頁面的載入過程不超過2秒,那麼那些超過2秒沒有響應成功的資料來源資料則用預設值替換,ExecutorService提供了invokeAll( )來完成這個任務。
下面我們通過一個示例演示invokeAll方法,程式中定義了3個任務,c1、c2、c3模擬執行時間分別為1、2、3秒,程式允許的最大執行時間是2秒,超過2秒的任務就會被取消。
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class TestCompletionService {
private class Target implements Callable<Integer> {
private int a = 0;
public Target(int a) {
// TODO Auto-generated constructor stub
this.a = a;
}
@Override
public Integer call() throws Exception {
// TODO Auto-generated method stub
Thread.sleep(1000*a);
return a;
}
}
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(5);
Callable<Integer> c1 = new TestCompletionService().new Target(1);
Callable<Integer> c2 = new TestCompletionService().new Target(2);
Callable<Integer> c3 = new TestCompletionService().new Target(3);
List<Callable<Integer>> list = new ArrayList<>();
list.add(c1);
list.add(c2);
list.add(c3);
try {
List<Future<Integer>> res = es.invokeAll(list, 2, TimeUnit.SECONDS);
Iterator<Future<Integer>> it = res.iterator();
while(it.hasNext()) {
Future<Integer> f = it.next();
int i = f.get();
System.out.println(i);
}
} catch (CancellationException e ) {
System.out.println("任務取消");
} catch (InterruptedException e) {
System.out.println("中斷異常");
} catch (ExecutionException e) {
System.out.println("執行異常");
}
}
}
程式的輸出:
1
2
任務取消
需要注意的是,java.util.concurrent中所有的關於時間的方法都將負數作為0處理,不需要額外的處理