1. 程式人生 > >五、Future&Callable

五、Future&Callable

五、Future&Callable

5.1 Introduction :Runnable & Callable 區別:

1)Runnable沒有返回值,Future & Callable 使執行緒具有返回值的功能。
2)Callable介面的call() 方法可以宣告丟擲異常,Runnable介面的run() 方法不可以丟擲異常,只能try catch
執行完callable介面中的任務,返回值是用future介面獲取的。
注意: callable()是一個函式式介面!!!而Future不是,Future有FutureTask這些比較常見的實現類

看下Callable的定義(已經將區別和聯絡說的很清楚了):
在這裡插入圖片描述

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *(計算一個結果,或者在出錯時甩出一個異常)
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

比較有意思的一點是 ,假如Callable 的泛型傳入的是Void,那麼Callable其實和Runnable一樣了,當然這樣的寫法其實有點無聊。

Callable<Void> callable = new Callable<Void>() {
    @Override
     public Void call() throws Exception {
         return null;
     }
 };

5.2 Future類

這是Future介面結構:

public interface Future<V>
1、A Future represents the result of an asynchronous computation.  
(Future代表一個非同步運算的結果)

2、Methods are provided to check if the  computation is complete,
to wait for its completion, and to retrieve the result of the computation. 
(Future中提供了API來檢查計算是否完成、等待計算完成、獲取計算結果)

3、The   result can only be retrieved using method get when the computation has completed, 
blocking if necessary until it is ready. Cancellation is performed by the cancel method.
(Future的結果只能在計算完成後拿到,在計算完之前一直阻塞,通過Cancel方法還可以取消計算任務)

4、 Additional methods are provided to determine if the task completed normally or was cancelled.
 Once a computation has completed, the computation cannot be cancelled. If you would like to 
 use a Future for the sake of cancellability but not provide a usable result, you can declare types of the 
 form Future<?> and return null as a result of the underlying task.
(Future提供API來判斷任務是正常完成還是取消了。已經完成計算的任務是不能取消的。
如果使用Future是為了利用這個類的“可取消性”,而不是利用它能返回結果的特性,那你可以將Future的
泛型宣告為 ?,並且將這個任務返回值設為null)

下面是一種場景:
Sample Usage (Note that the following classes are all made-up.)
   interface ArchiveSearcher { String search(String target); }
  class App {
    ExecutorService executor = ...
    ArchiveSearcher searcher = ...
    void showSearch(final String target)
        throws InterruptedException {
      Future<String> future
        = executor.submit(new Callable<String>() {
          public String call() {
              return searcher.search(target);
          }});
          // 下面一行:子執行緒在 search,父執行緒也在計算,實際上做到了並行化
      displayOtherThings(); // do other things while searching
      try {
        displayText(future.get()); // use future
      } catch (ExecutionException ex) { cleanup(); return; }
    }
  }

也可以用 FutureTask 來替代上面的寫法:
The FutureTask class is an implementation of Future that implements Runnable, and so may be 
executed by an Executor. For example, the above construction with submit could be replaced by:

   FutureTask<String> future =
    new FutureTask<String>(new Callable<String>() {
      public String call() {
        return searcher.search(target);
    }});
  executor.execute(future);

// 記憶體連續性效應:
Memory consistency effects: Actions taken by the asynchronous computation happen-before 
actions following the corresponding Future.get() in another thread.

5.3 get() 、isDone()

future.get() 是阻塞方法
future.isDone()不阻塞,不過要注意下用法,如官方註釋所言。

Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation – in all of these cases, this method will return true.

這個方法只是判斷任務是不是完成了。那什麼叫完成呢?
正常的執行完、丟擲異常、被取消了 --> 三種情況都叫已完成,所以不要誤以為只有normal termination才是已完成。

看個十分簡單的demo:

class MyCallable implements Callable<String> {
    private int age;

    public MyCallable(int age) {
        this.age = age;
    }

    @Override
    public String call() throws Exception {
        TimeUnit.SECONDS.sleep(3);
        return " age = " + age;
    }
}
public class MyFutureCallable {

    public static void main(String[] args) {
        MyCallable myCallable = new MyCallable(10);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Future<String> future = executorService.submit(myCallable);
        try {
            System.out.println(" begins at " + DateUtil.currentTime());
            System.out.println(future.get());
            System.out.println(" ends at " + DateUtil.currentTime());
            //等待時間剛好 3 秒
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

5.4 ExecutorService.submit(Runnable, T result)

在這裡插入圖片描述

ExecutorService.submit()有三種形式的傳參。
注意:submit(Runnable ,T result)這種傳參,result可作為執行結果的返回值,不需要用get()獲取了。

Future 是個 interface,下面程式碼實際執行者是 FutureTask

public class MyFutureCallable {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Person person = new Person();
        System.out.println(" person ,in the beginning " + person); //  Person(age=0, name=null)
        Future<Person> future = executorService.submit(new PersonRunnable(person), person);

        System.out.println(" person ,in the middle " + person); //  Person(age=0, name=null)

        // 說明:在執行future.get()後,才將計算後的T 賦值給了傳參的 T result。
        Person person1 = future.get();
        System.out.println(" person ,after the submit " + person); //  Person(age=10, name=ht)
        System.out.println(person == person1);
        System.out.println(person.hashCode() == person1.hashCode());
        // true 表示是同一個person物件

        executorService.shutdown();
    }
}

@AllArgsConstructor
class PersonRunnable implements Runnable{
    private Person person;

    @Override
    public void run() {
        person.setAge(10);
        person.setName("ht");
    }
}
@Data
class Person{
    private int age;
    private String name;
}

5.5 cancel(boolean mayInterruptIfRunning) & isCancelled()

根據官方文件呢,是這樣解釋的:
public abstract boolean cancel(boolean mayInterruptIfRunning)
Attempts to cancel execution of this task. This attempt will fail if the task has already completed, has already been cancelled, or could not be cancelled for some other reason. If successful, and this task has not started when cancel is called, this task should never run. If the task has already started, then the mayInterruptIfRunning parameter determines whether the thread executing this task should be interrupted in an attempt to stop the task.
After this method returns, subsequent calls to isDone will always return true. Subsequent calls to isCancelled will always return true if this method returned true.

而關於這個傳參:
mayInterruptIfRunning - true if the thread executing this task should be interrupted; otherwise, in-progress tasks are allowed to complete(true–>試圖中斷任務,false–>等任務執行完)

如何理解?
1、本API只是“試圖”取消任務
2、“試圖”而已,未必成功。在以下條件下就會“試圖”失敗:

  • a.任務已經完成了
  • b.任務已被取消過了
  • c. 由於某種原因取消不了了

3、尚未執行的任務若被能呼叫本方法,任務就不會執行了
4、若任務已經開始,那麼傳參mayInterruptIfRunning將會決定任務執行緒是否被置為interrupted狀態,以期能成功中止任務
5、cancel()返回之後呼叫isDone ()總是返回true;如果cancel()返回true,再呼叫isCancelled ()總是返回true。

用大白話轉述一下:
方法Future.cancel(boolean mayInterruptAfterRunning)返回值代表傳送取消任務的命令是否已經成功完成。(注: 不是 代表是否已經成功取消,而是 “傳送” 這個命令是否完成)(注:Future.cancel 而不是 ExecutorService.cancel!

那這個API有什麼樣的使用場景呢?
若執行緒正在執行,可以在程式碼中結合使用if(Thread.currentThread.isInterrupted)來確定是否已有中斷狀態,並通過丟擲異常來強制中止執行緒。

public class MyCallableDemo {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Callable<String> callable = () -> {
            try { // 注意一定要try catch 這個異常,主執行緒是無法列印這個異常的
                while (true) {
                    if (Thread.currentThread().isInterrupted()) {
                        throw new InterruptedException("中斷了!");
                    }
                    System.out.println("假如中斷了,就不執行了");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return " callable ";
        };
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        Future<String> future = executorService.submit(callable);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("attempt to cancel the mission --> " + future.cancel(true)
                + "; has cancelled ? --> " + future.isCancelled());
        executorService.shutdown();
    }

}
/* ===輸出結果===
java.lang.InterruptedException: 中斷了!
	at juc.MyCallableDemo.lambda$main$0(MyCallableDemo.java:18)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
假如中斷了,就不執行了
attempt to cancel the mission --> true; has cancelled ? --> true
 */

5.6 get(long timeout,TimeUnit unit)

在指定的最大時間裡等待返回值,get()是阻塞的,timeout的時間內拿不到結果,
throw timeoutException()

5.7 handle exceptions

關於Callable任務中的異常處理, 先來看這個小小的demo:

public class MyCallableDemo {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Callable<String> callable = () -> {
            System.out.println(" here begins the task ");
            int i = 1 / 0;  //此處丟擲了異常,但是主執行緒並不感知
            System.out.println(" here ends the task ");
            return "callable";
        };

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Future<String> future = executorService.submit(callable);
//        future.get();   // 如果本行被註釋了,則main  thread 不會輸出異常
        executorService.shutdown();
        System.out.println(" main thread ends here ");
    }
}
    /* output ::
    main thread ends here
    here begins the task

     */

這個簡單的demo,其實也就應了文章最初的說明:子執行緒的異常應該在子執行緒中去處理,而不應委託給父執行緒(父執行緒感知不到子執行緒的異常)

但這種說法不全對,再來看:

public class MyCallableDemo {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Callable<String> callable = () -> {
            System.out.println(" here begins the task ");
            int i = 1 / 0;  //此處丟擲了異常,干擾了主執行緒的正常執行
            System.out.println(" here ends the task ");
            return "callable";
        };

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Future<String> future = executorService.submit(callable);
        future.get(); // 此處丟擲了異常,後面兩句都不會再執行
        executorService.shutdown(); // 此處不執行
        System.out.println(" main thread ends here "); // 此處不執行 
    }
}
    /* output ::
  here begins the task 
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at juc.MyCallableDemo.main(MyCallableDemo.java:23)
Caused by: java.lang.ArithmeticException: / by zero
	at juc.MyCallableDemo.lambda$main$0(MyCallableDemo.java:16)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

     */

當執行到future.get()時,正常情況下應獲取一個返回值,但是由於在子執行緒中丟擲了異常,這個異常通過future.get()傳遞到了主執行緒,影響到了主執行緒的正常執行,尤其要注意的是executorService.shutdown();這樣的關閉執行緒池資源的操作也沒有執行!

所以呢,best practice應該是這樣的:

public class MyCallableDemo {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Callable<String> callable = () -> {
            try {
                System.out.println(" here begins the task ");
                int i = 1 / 0;
                System.out.println(" here ends the task ");
            } catch (Exception e) {
                e.printStackTrace();
//                throw e; // 可具體視情況要不要再丟擲去
            }
            return "callable";
        };

        ExecutorService executorService = null;
        try {
            executorService = Executors.newFixedThreadPool(2);
            Future<String> future = executorService.submit(callable);
            future.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            if (executorService != null) {
                executorService.shutdown();
            }
        }
        System.out.println(" main thread ends here ");
    }
}
    /* output ::
   here begins the task 
 main thread ends here 
java.lang.ArithmeticException: / by zero
	at juc.MyCallableDemo.lambda$main$0(MyCallableDemo.java:17)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
     */

怎麼去理解這段程式碼呢?
總的來說就是“子執行緒的異常讓子執行緒自己去處理,至於捕獲後要不要重新丟擲讓父執行緒再感知到,需要視具體場景而定了”。

5.8 自定義拒絕策略(RejectedExecutionHandler)

介面:RejectedExecutionHandler --> 執行緒池關閉依然有任務要執行時,可做些處理。
ThreadPoolExecutor 本身也有預設的拒絕策略,也可自定義來靈活配置。
看看類圖:
在這裡插入圖片描述
再看看官文的介紹:

public interface RejectedExecutionHandler
-->  A handler for tasks that cannot be executed by a ThreadPoolExecutor.
該方法只有一個介面: 
void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
--> Method that may be invoked by a ThreadPoolExecutor when execute cannot accept a task. 
This may occur when no more threads or queue slots are available because their bounds 
would  be exceeded, or upon shutdown of the Executor.
In the absence of other alternatives, the method may throw an unchecked RejectedExecutionException, 
which will be propagated (傳播)to the caller of execute.
--> Parameters:
   r - the runnable task requested to be executed
   executor - the executor attempting to execute this task
--> Throws:
   RejectedExecutionException - if there is no remedy

大白話就是:
這個介面提供了當一個執行緒池(佇列)由於執行緒數或者佇列容量不夠而需要“拒絕”這個任務時的處理方式。
需要尤其注意的是:設定了自定義的拒絕策略後,假如在關閉執行緒後,依然提交任務,可能導致執行緒池關閉失敗。
而使用預設的拒絕策略,關閉執行緒後再提交任務,將會直接丟擲異常。

public class MyRejectedExecutionDemo1 {

    public static void main(String[] args) {
        Callable<String> callable = () -> {
            String name = Thread.currentThread().getName();
            System.out.println(name + " running ");
            return name;
        };
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
        // 設定了 拒絕策略處理器
        RejectedExecutionHandler rejectedExecutionHandler = (r, executor1) -> {
            if (r instanceof FutureTask) { // 從這裡我們可知:執行緒池中實際執行任務的其實是FutureTask這個類
                System.out.println(r + "  rejected !");
                try {
                    // 本例中,這裡不會拋異常,拒絕的任務根本不會執行
                    System.out.println(((FutureTask) r).get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
        };
        executor.setRejectedExecutionHandler(rejectedExecutionHandler);

        executor.submit(callable);

        // 關閉了執行緒池資源
        executor.shutdownNow();
        // 但之後依然提交了任務,====所以執行緒池依然hold住,並沒有關閉====
        executor.submit(callable);
    }
}
// output:
/*
    pool-1-thread-1 running 
    [email protected]  rejected !

 !*/

5.9、execute()、submit()的區別

  1. 傳參不同
    a)execute(Runnable r)
    b)submit(Runnable task ) --> 執行成功,Future.get() == null
    || submit(Callable task )
    || submit(Runnable task,T result) --> 利用T 物件去接收執行結果
  2. execute() --> 無返回結果;
    submit() --> 有返回值
  3. 異常處理:
  • execute() 執行過程中有異常在子執行緒中就直接列印堆疊,當然可以通過ThreadFactory方式 對任務執行緒setUncaughtExceptionHandler 捕獲。
    (本質上,setUncaughtExeceptionHandler()是Thread的方法)
  • submit(new Callable () )可以顯式捕獲ExecutionException()

關於異常處理的N種姿勢,在此說明:

public class DifferenceDemo {

    public static void main(String[] args) {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        try {
            executor.execute(() -> {
                int i = 1 / 0;
                System.out.println(" 1/0 ends !");
            });
            // execute --> 只能執行Runnable,異常預設會從子執行緒中直接丟擲,而不是從父執行緒丟擲
        } catch (Exception e) {
            System.out.println("    A  這裡的異常   不會被  列印");
            e.printStackTrace();
        }

        executor.execute(() -> {
            try {
                int i = 1 / 0;
                System.out.println(" 1/0 ends !");
            } catch (Exception e) {
                System.out.println("    B 這裡的異常     會被  列印");
                e.printStackTrace();
            }
        });

        try {
            Future<?> future1 = executor.submit(() -> {
                int i = 1 / 0;
                System.out.println(" 1/0 ends !");
            });
            // submit --> 可以執行 Runnable + Callable 兩種任務,異常會被吞掉
        } catch (Exception e) {
            System.out.println("    C  這裡的異常   不被  列印");
            e.printStackTrace();
        }

        try {
            Future<?> future2 = executor.submit(() -> {	// 這裡不丟擲異常
                int i = 1 / 0;
                System.out.println(" 1/0 ends !");
            });
            future2.get(); // 此處會丟擲異常
            // 本質上::InterrupttedException、ExecutionException(這裡直接用“大異常”Exception 代替
            了)是通過 future .get()  抓或拋的,而不是通過submit 本身。**

        } catch (Exception e) {
            System.out.println("    D   這裡的異常   也會  被列印");
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

下例中:若不 try catch,丟擲的NPE無從知曉!還要注意的是:當你在Runnable裡抓了異常處理,而不重新丟擲的話(下面就是這種寫法),future.get()時將不會獲取Runnable中的異常。

	```
	Future<?> future = executor.submit(() -> {
	    try {
	        System.out.println(" i am submit ");
	        String a = null;
	        System.out.println(a.indexOf(0));
	    } catch (Exception e) {
	        e.printStackTrace();
	    }
	});
	future.get();
	```

因此:使用多執行緒的時候一定要記得try catch 或者 throw 的處理,否則連出錯的棧軌跡都沒有!!(巨坑!)

		```
		pool.setThreadFactory(new ThreadFactory() {
		    @Override
		    public Thread newThread(Runnable r) {
		        Thread thread = new Thread(r);
		        thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
		            @Override
		            public void uncaughtException(Thread t, Throwable e) {
		                System.out.println(t.getName());
		                System.out.println(e);
		            }
		        });
		        return thread ;
		// 一定要在此處 return 掉 工廠中生成的 執行緒物件
		    }
		});
		
		pool.execute(() -> {
		    String a = null;
		    System.out.println(a.indexOf(1)); 
		// NPE!,但是程式會繼續執行,而不是顯式throwh出來並中斷了程式。
		});
		System.out.println("--------");
		// 上面setXX可換成 lambda表示式
		
		```

5.9 future的缺點

Callable | Future 最大的好處是能夠拿到執行緒執行的結果
Future的預設實現類:FutureTask
Future f = ThreadPoolExecutor.submit(callable )的時候,本質也是利用FutureTask來處理。
缺點:future.get() 拿結果是阻塞的,而且主執行緒並不能保證先完成的任務返回值被先拿到
怎麼去理解呢?
就是說:主執行緒中有兩個任務A 、B可以get 結果, A慢B快,如果是按A、B的順序依次get(比如在迴圈裡),那一定要等A get到結果後,B才能get 結果。所以你看 B雖然快,但還是不能先get()到結果。