1. 程式人生 > >Java中的Runnable、Callable、Future、FutureTask的區別和CompletionService的使用場景

Java中的Runnable、Callable、Future、FutureTask的區別和CompletionService的使用場景

Java中存在Runnable、Callable、Future、FutureTask這幾個與執行緒相關的類或者介面,在Java中也是比較重要的幾個概念,我們通過下面的簡單示例來了解一下它們的作用於區別。

Runnable

其中Runnable應該是我們最熟悉的介面,它只有一個run()函式,用於將耗時操作寫在其中,該函式沒有返回值。然後使用某個執行緒去執行該runnable即可實現多執行緒,Thread類在呼叫start()函式後就是執行的是Runnable的run()函式。Runnable的宣告如下 :

@FunctionalInterface
public interface
Runnable {
/** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see
java.lang.Thread#run() */
public abstract void run(); }

Callable

Callable與Runnable的功能大致相似,Callable中有一個call()函式,但是call()函式有返回值,而Runnable的run()函式不能將結果返回給客戶程式。Callable的宣告如下 :

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return
computed result * @throws Exception if unable to compute a result */
V call() throws Exception; }

可以看到,這是一個泛型介面,call()函式返回的型別就是客戶程式傳遞進來的V型別。

Future

Executor就是Runnable和Callable的排程容器,Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果、設定結果操作。get方法會阻塞,直到任務返回結果(Future簡介)。Future宣告如下:

* @see FutureTask
 * @see Executor
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this Future's {@code get} method
 */
public interface Future<V> {

    /**
     * Attempts to cancel execution of this task.  This attempt will
     * fail if the task has already completed, has already been cancelled,
     * or could not be cancelled for some other reason. If successful,
     * and this task has not started when {@code cancel} is called,
     * this task should never run.  If the task has already started,
     * then the {@code mayInterruptIfRunning} parameter determines
     * whether the thread executing this task should be interrupted in
     * an attempt to stop the task.
     *
     * <p>After this method returns, subsequent calls to {@link #isDone} will
     * always return {@code true}.  Subsequent calls to {@link #isCancelled}
     * will always return {@code true} if this method returned {@code true}.
     *
     * @param mayInterruptIfRunning {@code true} if the thread executing this
     * task should be interrupted; otherwise, in-progress tasks are allowed
     * to complete
     * @return {@code false} if the task could not be cancelled,
     * typically because it has already completed normally;
     * {@code true} otherwise
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * Returns {@code true} if this task was cancelled before it completed
     * normally.
     *
     * @return {@code true} if this task was cancelled before it completed
     */
    boolean isCancelled();

    /**
     * Returns {@code true} if this task completed.
     *
     * Completion may be due to normal termination, an exception, or
     * cancellation -- in all of these cases, this method will return
     * {@code true}.
     *
     * @return {@code true} if this task completed
     */
    boolean isDone();

    /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     * @throws TimeoutException if the wait timed out
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask
FutureTask則是一個RunnableFuture< V>,而RunnableFuture實現了Runnbale又實現了Futrue< V>這兩個介面:

public class FutureTask<V> implements RunnableFuture<V> {
......
}

RunnableFuture

/**
 * A {@link Future} that is {@link Runnable}. Successful execution of
 * the {@code run} method causes completion of the {@code Future}
 * and allows access to its results.
 * @see FutureTask
 * @see Executor
 * @since 1.6
 * @author Doug Lea
 * @param <V> The result type returned by this Future's {@code get} method
 */
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

另外FutureTask還可以包裝Runnable和Callable< V>, 由建構函式注入依賴。

/**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Callable}.
     *
     * @param  callable the callable task
     * @throws NullPointerException if the callable is null
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    /**
     * Creates a {@code FutureTask} that will, upon running, execute the
     * given {@code Runnable}, and arrange that {@code get} will return the
     * given result on successful completion.
     *
     * @param runnable the runnable task
     * @param result the result to return on successful completion. If
     * you don't need a particular result, consider using
     * constructions of the form:
     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
     * @throws NullPointerException if the runnable is null
     */
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

可以看到,Runnable注入會被Executors.callable()函式轉換為Callable型別,即FutureTask最終都是執行Callable型別的任務。該適配函式的實現如下 :

/**
     * Returns a {@link Callable} object that, when
     * called, runs the given task and returns the given result.  This
     * can be useful when applying methods requiring a
     * {@code Callable} to an otherwise resultless action.
     * @param task the task to run
     * @param result the result to return
     * @param <T> the type of the result
     * @return a callable object
     * @throws NullPointerException if task null
     */
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

RunnableAdapter介面卡

 /**
     * A callable that runs given task and returns given result
     */
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

由於FutureTask實現了Runnable,因此它既可以通過Thread包裝來直接執行,也可以提交給ExecuteService來執行。並且還可以直接通過get()函式獲取執行結果,該函式會阻塞,直到結果返回。

因此FutureTask既是Future、Runnable,又是包裝了Callable(如果是Runnable最終也會被轉換為Callable ), 它是這兩者的合體。

完整示例:

package com.stay4it.rx;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class FutureTest {

    public static class Task implements Runnable {

        @Override
        public void run() {
            // TODO Auto-generated method stub
            System.out.println("run");
        }

    }
    public static class Task2 implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            System.out.println("call");
            return fibc(30);
        }

    }

     /** 
     * runnable, 無返回值 
     */  
    public static void testRunnable(){
        ExecutorService executorService = Executors.newCachedThreadPool();

        Future<String> future = (Future<String>) executorService.submit(new Task());
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        executorService.shutdown();
    }

    /** 
     * Callable, 有返回值 
     */  
    public static void testCallable(){
        ExecutorService executorService = Executors.newCachedThreadPool();

        Future<Integer> future = (Future<Integer>) executorService.submit(new Task2());
        try {
            System.out.println(future.get());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        executorService.shutdown();
    }

     /** 
     * FutureTask則是一個RunnableFuture<V>,即實現了Runnbale又實現了Futrue<V>這兩個介面, 
     * 另外它還可以包裝Runnable(實際上會轉換為Callable)和Callable 
     * <V>,所以一般來講是一個符合體了,它可以通過Thread包裝來直接執行,也可以提交給ExecuteService來執行 
     * ,並且還可以通過v get()返回執行結果,線上程體沒有執行完成的時候,主執行緒一直阻塞等待,執行完則直接返回結果。 
     */  
    public static void testFutureTask(){
        ExecutorService executorService = Executors.newCachedThreadPool();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task2());

        executorService.submit(futureTask);
        try {
            System.out.println(futureTask.get());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        executorService.shutdown();
    }

     /** 
     * FutureTask則是一個RunnableFuture<V>,即實現了Runnbale又實現了Futrue<V>這兩個介面, 
     * 另外它還可以包裝Runnable(實際上會轉換為Callable)和Callable 
     * <V>,所以一般來講是一個符合體了,它可以通過Thread包裝來直接執行,也可以提交給ExecuteService來執行 
     * ,並且還可以通過v get()返回執行結果,線上程體沒有執行完成的時候,主執行緒一直阻塞等待,執行完則直接返回結果。 
     */  
    public static void testFutureTask2(){
        ExecutorService executorService = Executors.newCachedThreadPool();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub
                System.out.println("testFutureTask2 run");
            }
        },fibc(30));

        executorService.submit(futureTask);
        try {
            System.out.println(futureTask.get());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        executorService.shutdown();
    }



    public static void main(String[] args) {

        testCallable();

    }

    /** 
     * 效率低下的斐波那契數列, 耗時的操作 
     *  
     * @param num 
     * @return 
     */  
    static int fibc(int num) {  
        if (num == 0) {  
            return 0;  
        }  
        if (num == 1) {  
            return 1;  
        }  
        return fibc(num - 1) + fibc(num - 2);  
    }  

}

CompletionService

《Java併發程式設計實踐》中關於CompletionService的描述如下:

如果向Executor提交了一組計算任務,並且希望在計算完成後獲得結果,那麼可以保留與每個任務關聯的Future,然後反覆使用get方法,同時將引數timeout指定為0,從而通過輪詢來判斷任務是否完成。這種方法雖然可行,但卻有些繁瑣。幸運的是,還有一種更好的方法:完成服務CompletionService。

CompleteService介面是為了方便多個任務執行時,可以方便得獲取到執行任務的Future結果。介面內容如下:

public interface CompletionService<V> {
    Future<V> submit(Callable<V> task);
    Future<V> submit(Runnable task, V result);
    Future<V> take() throws InterruptedException;
    Future<V> poll();
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

這五個方法分為兩大方面。一個是對Callable和Runnable型別引數的任務提交,另一方面則是嘗試對結果以不同的方式進行獲取,take()方法一般是阻塞式的獲取,後兩者則更靈活。

通常來講,CompleteService是要和Executor結合在一起使用的。

ExecutorCompletionService

在JDK中,ExecutorCompletionService是CompletionService介面的唯一實現類。
這個實現類主要做的事就是將執行完成的任務結果放到阻塞佇列中,這樣等待結果的執行緒,如果執行take()方法會得到結果並恢復執行。

ExecutorCompletionService有3個屬性:

  • AbstractExecutorService類的物件aes
  • Executor類的物件executor
  • BlockingQueue<Future<V>>的completionQueue

通常,如果executor是AbstractExecutorService的一個實現,則將其賦值給aes屬性,否則賦值為null。

在這個類中,executor負責執行任務,而aes則負責做適配處理,返回包裝好任務的FutureTask物件。

這裡面有一個對於實現功能很重要的內部類QueueingFuture,實現如下:

/**
  * FutureTask extension to enqueue upon completion
  */
 private class QueueingFuture extends FutureTask<Void> {
     QueueingFuture(RunnableFuture<V> task) {
         super(task, null);
         this.task = task;
     }
     protected void done() { completionQueue.add(task); }
     private final Future<V> task;
 }

QueueingFuture是FutureTask的一個子類,通過擴充套件該子類的done方法,可以實現當任務完成時,將結果放入到BlockingQueue中。

而通過使用BlockingQueue的take或poll方法,則可以得到結果。在BlockingQueue不存在元素時,這兩個操作會阻塞,一旦有結果加入,則立即返回。

public Future<V> take() throws InterruptedException {  
    return completionQueue.take();  
}  

public Future<V> poll() {  
    return completionQueue.poll();  
}  

下面我們通過例子來體驗下CompletionService的好處與使用場景。

首先定義一個實現了Callable介面的Task類:

private static class Task implements Callable<String>{  

        private volatile int i;  

        public Task(int i){  
            this.i = i;  
        }  

        @Override  
        public String call() throws Exception {  
            Thread.sleep(1000);  
            System.out.println(Thread.currentThread().getName());  
            return "任務 : " + i;  
        }  

    }

1)自己維護一個list儲存submit的callable task所返回的Future物件。在主執行緒中遍歷集合並呼叫Future的get()方法取到Task的返回值。如下程式碼所示:

public static void testFuture() throws InterruptedException, ExecutionException {  
        System.out.println("main Thread begin:");  
        ExecutorService executor = Executors.newCachedThreadPool();  
        List<Future<String>> result = new ArrayList<Future<String>>();  
        for (int i = 0;i<10;i++) {  
            Future<String> submit = executor.submit(new Task(i));  
            result.add(submit);  
        }  
        executor.shutdown();  
        for (int i = 0;i<10;i++) {//一個一個等待返回結果  
            System.out.println(result.get(i).get());  
        }  
        System.out.println("main Thread end:");  
    }

輸出結果如下:

main Thread begin:
pool-1-thread-2
pool-1-thread-1
pool-1-thread-3
任務 : 0
任務 : 1
任務 : 2
pool-1-thread-10
pool-1-thread-6
pool-1-thread-9
pool-1-thread-8
pool-1-thread-7
pool-1-thread-4
pool-1-thread-5
任務 : 3
任務 : 4
任務 : 5
任務 : 6
任務 : 7
任務 : 8
任務 : 9
main Thread end:

從輸出結果可以看出,我們只能一個一個阻塞的取出。這中間肯定會浪費一定的時間在等待上。如5返回了,但是前面的1-4都沒有返回,那麼5就得等1-4輸出才能輸出。

2)通過CompletionService包裝ExecutorService,然後呼叫其take()方法去取Future物件。如下程式碼所示:

private static void testCompletionService() throws InterruptedException, ExecutionException {
        System.out.println("main Thread begin:");  
        ExecutorService executor = Executors.newCachedThreadPool();
        ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executor);
        for(int i=0;i<10;i++){  
            completionService.submit(new Task(i));  
        }
        executor.shutdown();
        for(int i=0;i<10;i++){  
            System.out.println(completionService.take().get());  
        }
        System.out.println("main Thread end:");  
    }

輸出結果如下:

main Thread begin:
pool-1-thread-9
pool-1-thread-7
pool-1-thread-5
pool-1-thread-6
pool-1-thread-3
pool-1-thread-4
任務 : 8
任務 : 5
任務 : 4
任務 : 6
pool-1-thread-2
pool-1-thread-10
pool-1-thread-1
pool-1-thread-8
任務 : 2
任務 : 3
任務 : 1
任務 : 9
任務 : 0
任務 : 7
main Thread end:

可以看出,結果的輸出和執行緒的放入順序無關係。每一個執行緒執行成功後,立刻就輸出。如5返回了,不管前面的1-4有沒有返回,5馬上輸出,不管它們加入執行緒池的順序,從而節省時間。

總結

  1. 自己建立一個集合來儲存Future存根並迴圈呼叫其返回結果的時候,主執行緒並不能保證首先獲得的是最先完成任務的執行緒返回值。它只是按加入執行緒池的順序返回。因為take方法是阻塞方法,後面的任務完成了,前面的任務卻沒有完成,主程式就那樣等待在那兒,只到前面的完成了,它才知道原來後面的也完成了。
  2. 使用CompletionService來維護處理執行緒的返回結果時,主執行緒總是能夠拿到最先完成的任務的返回值,而不管它們加入執行緒池的順序。
  3. CompletionService的實現是維護了一個儲存Future的BlockingQueque。只有當這個Future的任務狀態是結束的時候,才會加入到這個Queque中,take()方法其實就是Producer-Consumer中的Consumer。它會從Queue中取出Future物件,如果Queue是空的,就會阻塞在那裡,直到有完成的Future物件加入到Queue中。也就是先完成的必定先被取出,這樣就減少了不必要的等待時間。