1. 程式人生 > 其它 >011Java併發包011非同步呼叫

011Java併發包011非同步呼叫

本文主要學習了Java中非同步呼叫涉及到的Future和CompletableFuture。

1 Future

1.1 說明

在執行多個任務的時候,可以使用Java標準庫提供的執行緒池,提交的任務只需要實現Runnable介面,就可以交給執行緒池去執行。

但是Runnable介面有個問題,那就是沒有返回值。為此,Java標準庫還提供了Callable介面,比Runnable介面多了返回值。

JDK在1.5之後增加了一個Future介面,用於獲取非同步執行的結果。

在Executor中提供的execute方法之外,在Executor的子介面ExecutorService中還提供了submit方法,submit方法的返回值是Future型別的物件。

1.2 使用

1.2.1 通過Callable獲得返回值

使用Callable作為入參,獲取Future結果:

1 <T> Future<T> submit(Callable<T> task);

這種方式是最常用的一種,通過Callable物件儲存結果,通過Future物件返回獲取結果。

舉例如下:

 1 public static void main(String[] args) {
 2     ExecutorService executor = Executors.newSingleThreadExecutor();
 3     Future<String> future = executor.submit(
4 new Callable<String>() { 5 @Override 6 public String call() throws Exception { 7 return "Hello World"; 8 } 9 } 10 ); 11 try { 12 System.out.print(future.get()); 13 } catch (Exception e) {
14 e.printStackTrace(); 15 } finally { 16 executor.shutdown(); 17 } 18 }

1.2.2 通過傳入指定物件獲取返回值

使用Runnable和返回物件作為入參,獲取Future結果:

1 <T> Future<T> submit(Runnable task, T result);

通過傳入的物件儲存返回值,通過Future物件返回獲取結果。

舉例如下:

 1 public static void main(String[] args) {
 2   final String[] result = new String[1];
 3   ExecutorService executor = Executors.newSingleThreadExecutor();
 4   Future<String[]> future = executor.submit(
 5       new Runnable() {
 6         @Override
 7         public void run() {
 8           result[0] = "Hello World";
 9         }
10       }, result
11   );
12   try {
13     System.out.print(future.get()[0]);
14   } catch (Exception e) {
15     e.printStackTrace();
16   } finally {
17     executor.shutdown();
18   }
19 }

1.2.3 無需返回值

使用Runnable作為入參:

1 Future<?> submit(Runnable task);

這種方式無需獲取返回值。

舉例如下:

 1 public static void main(String[] args) {
 2     ExecutorService executor = Executors.newSingleThreadExecutor();
 3     Future future = executor.submit(
 4             new Runnable() {
 5                 @Override
 6                 public void run() {
 7                 }
 8             }
 9     );
10     try {
11         System.out.print(future.get());
12     } catch (Exception e) {
13         e.printStackTrace();
14     } finally {
15         executor.shutdown();
16     }
17 }

2 CompletableFuture

2.1 說明

雖然Future以及相關使用方法提供了非同步執行任務的能力,但是對於結果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務的結果。阻塞的方式顯然和非同步程式設計的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,而且也不能及時地得到計算結果。

在JDK1.8中,CompletableFuture提供了非常強大的Future的擴充套件功能,簡化了非同步程式設計的複雜性,並且提供了函數語言程式設計的能力,可以通過回撥的方式處理計算結果,也提供了轉換和組合CompletableFuture的方法。

CompletableFuture實現了CompletionStage介面和Future介面,CompletionStage代表非同步計算過程中的某一個階段,一個階段完成以後可能會觸發另外一個階段。一個階段的執行可能是被單個階段的完成觸發,也可能是由多個階段一起觸發。

2.2 使用

2.2.1 新建

CompletableFuture提供了四個靜態方法來建立一個非同步操作:

1 public static CompletableFuture<Void> runAsync(Runnable runnable)
2 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
3 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
4 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

runAsync表示建立無返回值的非同步任務,相當於Runnable作為引數的submit方法:

 1 public static void main(String[] args) {
 2   CompletableFuture<Void> cf = CompletableFuture.runAsync(()->{
 3     System.out.println(Thread.currentThread().getName() + "-runAsync");
 4   });
 5   try {
 6     System.out.println(Thread.currentThread().getName() + "-result-" + cf.get());
 7   } catch (Exception e) {
 8     e.printStackTrace();
 9   }
10 }

結果如下:

1 ForkJoinPool.commonPool-worker-9-runAsync
2 main-result-null

supplyAsync表示建立帶返回值的非同步任務的,相當於Callable作為引數的submit方法:

 1 public static void main(String[] args) {
 2   CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{
 3     System.out.println(Thread.currentThread().getName() + "-supplyAsync");
 4     return "test";
 5   });
 6   try {
 7     System.out.println(Thread.currentThread().getName() + "-result-" + cf.get());
 8   } catch (Exception e) {
 9     e.printStackTrace();
10   }
11 }

結果如下:

1 ForkJoinPool.commonPool-worker-9-supplyAsync
2 main-result-test

2.2.2 完成時回撥

當CompletableFuture的計算結果完成,可以執行特定的方法:

1 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
2 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
3 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

whenComplete會在當前執行的執行緒繼續執行CompletableFuture定義的任務:

 1 public static void main(String[] args) {
 2   CompletableFuture<Integer> future = new CompletableFuture<Integer>();
 3   new Thread(() -> {
 4     System.out.println(Thread.currentThread().getName() + "-啟動");
 5     try {
 6       Thread.sleep(1000);
 7     } catch (Exception e) {
 8       e.printStackTrace();
 9     }
10     System.out.println(Thread.currentThread().getName() + "-完成");
11     future.complete(100);
12     System.out.println(Thread.currentThread().getName() + "-結束");
13   }).start();
14   future.whenComplete((t, u) -> {
15     try {
16       Thread.sleep(1000);
17     } catch (Exception e) {
18       e.printStackTrace();
19     }
20     System.out.println(Thread.currentThread().getName() + "-t-" + t);
21     System.out.println(Thread.currentThread().getName() + "-u-" + u);
22   });
23   try {
24     System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
25   } catch (Exception e) {
26     e.printStackTrace();
27   }
28 }

執行結果:

1 Thread-0-啟動
2 //等待1s
3 Thread-0-完成
4 main-return-100
5 //等待1s
6 Thread-0-v-100
7 Thread-0-ex-null
8 Thread-0-結束

whenCompleteAsync會線上程池起一個新的執行緒執行CompletableFuture定義的任務:

 1 public static void main(String[] args) {
 2   CompletableFuture<Integer> future = new CompletableFuture<Integer>();
 3   new Thread(() -> {
 4     System.out.println(Thread.currentThread().getName() + "-啟動");
 5     try {
 6       Thread.sleep(1000);
 7     } catch (Exception e) {
 8       e.printStackTrace();
 9     }
10     System.out.println(Thread.currentThread().getName() + "-完成");
11     future.complete(100);
12     System.out.println(Thread.currentThread().getName() + "-結束");
13   }).start();
14   future.whenCompleteAsync((t, u) -> {
15     try {
16       Thread.sleep(1000);
17     } catch (Exception e) {
18       e.printStackTrace();
19     }
20     System.out.println(Thread.currentThread().getName() + "-t-" + t);
21     System.out.println(Thread.currentThread().getName() + "-u-" + u);
22   });
23   try {
24     System.out.println(Thread.currentThread().getName() + "-return-" + future.get());
25     // 主執行緒不要立刻結束,否則CompletableFuture預設使用的執行緒池會立刻關閉
26     Thread.sleep(2000);
27   } catch (Exception e) {
28     e.printStackTrace();
29   }
30 }

執行結果:

1 Thread-0-啟動
2 //等待1s
3 Thread-0-完成
4 main-return-100
5 Thread-0-結束
6 //等待1s
7 ForkJoinPool.commonPool-worker-9-v-100
8 ForkJoinPool.commonPool-worker-9-ex-null
9 //等待1s

2.2.3 異常處理

當CompletableFuture的計算丟擲異常的時候,可以執行特定的方法:

1 public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

exceptionally會在丟擲異常的時候執行:

 1 public static void main(String[] args) {
 2   CompletableFuture<Integer> future = new CompletableFuture<Integer>();
 3   new Thread(() -> {
 4     future.completeExceptionally(new NullPointerException());
 5   }).start();
 6   future.exceptionally(e -> {
 7     e.printStackTrace();
 8     return 100;
 9   });
10 }

結果如下:

1 java.lang.NullPointerException

2.2.4 執行緒依賴

當一個執行緒依賴另一個執行緒時,可以使用方法來把這兩個執行緒序列化:

1 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
2 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
3 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

序列化執行:

 1 public static void main(String[] args) {
 2   CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
 3     System.out.println(Thread.currentThread().getName() + "-supplyAsync");
 4     return "test";
 5   }).thenApply(t->{
 6     System.out.println(Thread.currentThread().getName() + "-t-" + t);
 7     return t + "-apply";
 8   });
 9   try {
10     System.out.println(Thread.currentThread().getName() + "-" + future.get());
11   } catch (Exception e) {
12     e.printStackTrace();
13   }
14 }

結果如下:

1 ForkJoinPool.commonPool-worker-9-supplyAsync
2 ForkJoinPool.commonPool-worker-9-t-test
3 main-test-apply

2.2.5 消費處理

接收任務的處理結果,並消費處理,無返回結果:

1 public CompletionStage<Void> thenAccept(Consumer<? super T> action)
2 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action)
3 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)

消費處理結果:

 1 public static void main(String[] args) {
 2   CompletableFuture<Void> future = CompletableFuture.supplyAsync(()->{
 3     System.out.println(Thread.currentThread().getName() + "-supplyAsync");
 4     return "test";
 5   }).thenAccept(t->{
 6     System.out.println(Thread.currentThread().getName() + "-t-" + t);
 7   });
 8   try {
 9     System.out.println(Thread.currentThread().getName() + "-" + future.get());
10   } catch (Exception e) {
11     e.printStackTrace();
12   }
13 }

結果如下:

1 ForkJoinPool.commonPool-worker-9-supplyAsync
2 main-t-test
3 main-null

2.2.6 取消返回

不接收任務的處理結果,取消返回:

1 public CompletionStage<Void> thenRun(Runnable action)
2 public CompletionStage<Void> thenRunAsync(Runnable action)
3 public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor)

取消返回:

 1 public static void main(String[] args) {
 2     CompletableFuture<Void> future = CompletableFuture.supplyAsync(()->{
 3         System.out.println(Thread.currentThread().getName() + "-supplyAsync");
 4         return "test";
 5     }).thenRun(()->{
 6         System.out.println(Thread.currentThread().getName() + "-t-null");
 7     });
 8     try {
 9         System.out.println(Thread.currentThread().getName() + "-" + future.get());
10     } catch (Exception e) {
11         e.printStackTrace();
12     }
13 }

結果如下:

1 ForkJoinPool.commonPool-worker-9-supplyAsync
2 main-t-null
3 main-null