Java中的Runnable、Callable、Future、FutureTask的區別和CompletionService的使用場景
Java中存在Runnable、Callable、Future、FutureTask這幾個與執行緒相關的類或者介面,在Java中也是比較重要的幾個概念,我們通過下面的簡單示例來了解一下它們的作用於區別。
Runnable
其中Runnable應該是我們最熟悉的介面,它只有一個run()函式,用於將耗時操作寫在其中,該函式沒有返回值。然後使用某個執行緒去執行該runnable即可實現多執行緒,Thread類在呼叫start()函式後就是執行的是Runnable的run()函式。Runnable的宣告如下 :
@FunctionalInterface
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
Callable
Callable與Runnable的功能大致相似,Callable中有一個call()函式,但是call()函式有返回值,而Runnable的run()函式不能將結果返回給客戶程式。Callable的宣告如下 :
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
可以看到,這是一個泛型介面,call()函式返回的型別就是客戶程式傳遞進來的V型別。
Future
Executor就是Runnable和Callable的排程容器,Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果、設定結果操作。get方法會阻塞,直到任務返回結果(Future簡介)。Future宣告如下:
* @see FutureTask
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface Future<V> {
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* Returns {@code true} if this task was cancelled before it completed
* normally.
*
* @return {@code true} if this task was cancelled before it completed
*/
boolean isCancelled();
/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
boolean isDone();
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
FutureTask
FutureTask則是一個RunnableFuture< V>,而RunnableFuture實現了Runnbale又實現了Futrue< V>這兩個介面:
public class FutureTask<V> implements RunnableFuture<V> {
......
}
RunnableFuture
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
另外FutureTask還可以包裝Runnable和Callable< V>, 由建構函式注入依賴。
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
可以看到,Runnable注入會被Executors.callable()函式轉換為Callable型別,即FutureTask最終都是執行Callable型別的任務。該適配函式的實現如下 :
/**
* Returns a {@link Callable} object that, when
* called, runs the given task and returns the given result. This
* can be useful when applying methods requiring a
* {@code Callable} to an otherwise resultless action.
* @param task the task to run
* @param result the result to return
* @param <T> the type of the result
* @return a callable object
* @throws NullPointerException if task null
*/
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
RunnableAdapter介面卡
/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
由於FutureTask實現了Runnable,因此它既可以通過Thread包裝來直接執行,也可以提交給ExecuteService來執行。並且還可以直接通過get()函式獲取執行結果,該函式會阻塞,直到結果返回。
因此FutureTask既是Future、Runnable,又是包裝了Callable(如果是Runnable最終也會被轉換為Callable ), 它是這兩者的合體。
完整示例:
package com.stay4it.rx;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
public class FutureTest {
public static class Task implements Runnable {
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("run");
}
}
public static class Task2 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("call");
return fibc(30);
}
}
/**
* runnable, 無返回值
*/
public static void testRunnable(){
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = (Future<String>) executorService.submit(new Task());
try {
System.out.println(future.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
executorService.shutdown();
}
/**
* Callable, 有返回值
*/
public static void testCallable(){
ExecutorService executorService = Executors.newCachedThreadPool();
Future<Integer> future = (Future<Integer>) executorService.submit(new Task2());
try {
System.out.println(future.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
executorService.shutdown();
}
/**
* FutureTask則是一個RunnableFuture<V>,即實現了Runnbale又實現了Futrue<V>這兩個介面,
* 另外它還可以包裝Runnable(實際上會轉換為Callable)和Callable
* <V>,所以一般來講是一個符合體了,它可以通過Thread包裝來直接執行,也可以提交給ExecuteService來執行
* ,並且還可以通過v get()返回執行結果,線上程體沒有執行完成的時候,主執行緒一直阻塞等待,執行完則直接返回結果。
*/
public static void testFutureTask(){
ExecutorService executorService = Executors.newCachedThreadPool();
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task2());
executorService.submit(futureTask);
try {
System.out.println(futureTask.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
executorService.shutdown();
}
/**
* FutureTask則是一個RunnableFuture<V>,即實現了Runnbale又實現了Futrue<V>這兩個介面,
* 另外它還可以包裝Runnable(實際上會轉換為Callable)和Callable
* <V>,所以一般來講是一個符合體了,它可以通過Thread包裝來直接執行,也可以提交給ExecuteService來執行
* ,並且還可以通過v get()返回執行結果,線上程體沒有執行完成的時候,主執行緒一直阻塞等待,執行完則直接返回結果。
*/
public static void testFutureTask2(){
ExecutorService executorService = Executors.newCachedThreadPool();
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("testFutureTask2 run");
}
},fibc(30));
executorService.submit(futureTask);
try {
System.out.println(futureTask.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
executorService.shutdown();
}
public static void main(String[] args) {
testCallable();
}
/**
* 效率低下的斐波那契數列, 耗時的操作
*
* @param num
* @return
*/
static int fibc(int num) {
if (num == 0) {
return 0;
}
if (num == 1) {
return 1;
}
return fibc(num - 1) + fibc(num - 2);
}
}
CompletionService
《Java併發程式設計實踐》中關於CompletionService的描述如下:
如果向Executor提交了一組計算任務,並且希望在計算完成後獲得結果,那麼可以保留與每個任務關聯的Future,然後反覆使用get方法,同時將引數timeout指定為0,從而通過輪詢來判斷任務是否完成。這種方法雖然可行,但卻有些繁瑣。幸運的是,還有一種更好的方法:完成服務CompletionService。
CompleteService介面是為了方便多個任務執行時,可以方便得獲取到執行任務的Future結果。介面內容如下:
public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
這五個方法分為兩大方面。一個是對Callable和Runnable型別引數的任務提交,另一方面則是嘗試對結果以不同的方式進行獲取,take()方法一般是阻塞式的獲取,後兩者則更靈活。
通常來講,CompleteService是要和Executor結合在一起使用的。
ExecutorCompletionService
在JDK中,ExecutorCompletionService是CompletionService介面的唯一實現類。
這個實現類主要做的事就是將執行完成的任務結果放到阻塞佇列中,這樣等待結果的執行緒,如果執行take()方法會得到結果並恢復執行。
ExecutorCompletionService有3個屬性:
- AbstractExecutorService類的物件aes
- Executor類的物件executor
BlockingQueue<Future<V>>
的completionQueue
通常,如果executor是AbstractExecutorService的一個實現,則將其賦值給aes屬性,否則賦值為null。
在這個類中,executor負責執行任務,而aes則負責做適配處理,返回包裝好任務的FutureTask物件。
這裡面有一個對於實現功能很重要的內部類QueueingFuture,實現如下:
/**
* FutureTask extension to enqueue upon completion
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
QueueingFuture是FutureTask的一個子類,通過擴充套件該子類的done方法,可以實現當任務完成時,將結果放入到BlockingQueue中。
而通過使用BlockingQueue的take或poll方法,則可以得到結果。在BlockingQueue不存在元素時,這兩個操作會阻塞,一旦有結果加入,則立即返回。
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
下面我們通過例子來體驗下CompletionService的好處與使用場景。
首先定義一個實現了Callable介面的Task類:
private static class Task implements Callable<String>{
private volatile int i;
public Task(int i){
this.i = i;
}
@Override
public String call() throws Exception {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName());
return "任務 : " + i;
}
}
1)自己維護一個list儲存submit的callable task所返回的Future物件。在主執行緒中遍歷集合並呼叫Future的get()方法取到Task的返回值。如下程式碼所示:
public static void testFuture() throws InterruptedException, ExecutionException {
System.out.println("main Thread begin:");
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<String>> result = new ArrayList<Future<String>>();
for (int i = 0;i<10;i++) {
Future<String> submit = executor.submit(new Task(i));
result.add(submit);
}
executor.shutdown();
for (int i = 0;i<10;i++) {//一個一個等待返回結果
System.out.println(result.get(i).get());
}
System.out.println("main Thread end:");
}
輸出結果如下:
main Thread begin:
pool-1-thread-2
pool-1-thread-1
pool-1-thread-3
任務 : 0
任務 : 1
任務 : 2
pool-1-thread-10
pool-1-thread-6
pool-1-thread-9
pool-1-thread-8
pool-1-thread-7
pool-1-thread-4
pool-1-thread-5
任務 : 3
任務 : 4
任務 : 5
任務 : 6
任務 : 7
任務 : 8
任務 : 9
main Thread end:
從輸出結果可以看出,我們只能一個一個阻塞的取出。這中間肯定會浪費一定的時間在等待上。如5返回了,但是前面的1-4都沒有返回,那麼5就得等1-4輸出才能輸出。
2)通過CompletionService包裝ExecutorService,然後呼叫其take()方法去取Future物件。如下程式碼所示:
private static void testCompletionService() throws InterruptedException, ExecutionException {
System.out.println("main Thread begin:");
ExecutorService executor = Executors.newCachedThreadPool();
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executor);
for(int i=0;i<10;i++){
completionService.submit(new Task(i));
}
executor.shutdown();
for(int i=0;i<10;i++){
System.out.println(completionService.take().get());
}
System.out.println("main Thread end:");
}
輸出結果如下:
main Thread begin:
pool-1-thread-9
pool-1-thread-7
pool-1-thread-5
pool-1-thread-6
pool-1-thread-3
pool-1-thread-4
任務 : 8
任務 : 5
任務 : 4
任務 : 6
pool-1-thread-2
pool-1-thread-10
pool-1-thread-1
pool-1-thread-8
任務 : 2
任務 : 3
任務 : 1
任務 : 9
任務 : 0
任務 : 7
main Thread end:
可以看出,結果的輸出和執行緒的放入順序無關係。每一個執行緒執行成功後,立刻就輸出。如5返回了,不管前面的1-4有沒有返回,5馬上輸出,不管它們加入執行緒池的順序,從而節省時間。
總結
- 自己建立一個集合來儲存Future存根並迴圈呼叫其返回結果的時候,主執行緒並不能保證首先獲得的是最先完成任務的執行緒返回值。它只是按加入執行緒池的順序返回。因為take方法是阻塞方法,後面的任務完成了,前面的任務卻沒有完成,主程式就那樣等待在那兒,只到前面的完成了,它才知道原來後面的也完成了。
- 使用CompletionService來維護處理執行緒的返回結果時,主執行緒總是能夠拿到最先完成的任務的返回值,而不管它們加入執行緒池的順序。
- CompletionService的實現是維護了一個儲存Future的BlockingQueque。只有當這個Future的任務狀態是結束的時候,才會加入到這個Queque中,take()方法其實就是Producer-Consumer中的Consumer。它會從Queue中取出Future物件,如果Queue是空的,就會阻塞在那裡,直到有完成的Future物件加入到Queue中。也就是先完成的必定先被取出,這樣就減少了不必要的等待時間。