011Java併發包011非同步呼叫
1 Future
1.1 說明
在執行多個任務的時候,可以使用Java標準庫提供的執行緒池,提交的任務只需要實現Runnable介面,就可以交給執行緒池去執行。
但是Runnable介面有個問題,那就是沒有返回值。為此,Java標準庫還提供了Callable介面,比Runnable介面多了返回值。
JDK在1.5之後增加了一個Future介面,用於獲取非同步執行的結果。
在Executor中提供的execute方法之外,在Executor的子介面ExecutorService中還提供了submit方法,submit方法的返回值是Future型別的物件。
1.2 使用
1.2.1 通過Callable獲得返回值
使用Callable作為入參,獲取Future結果:
1 <T> Future<T> submit(Callable<T> task);
這種方式是最常用的一種,通過Callable物件儲存結果,通過Future物件返回獲取結果。
舉例如下:
1 public static void main(String[] args) { 2 ExecutorService executor = Executors.newSingleThreadExecutor(); 3 Future<String> future = executor.submit(4 new Callable<String>() { 5 @Override 6 public String call() throws Exception { 7 return "Hello World"; 8 } 9 } 10 ); 11 try { 12 System.out.print(future.get()); 13 } catch (Exception e) {14 e.printStackTrace(); 15 } finally { 16 executor.shutdown(); 17 } 18 }
1.2.2 通過傳入指定物件獲取返回值
使用Runnable和返回物件作為入參,獲取Future結果:
1 <T> Future<T> submit(Runnable task, T result);
通過傳入的物件儲存返回值,通過Future物件返回獲取結果。
舉例如下:
1 public static void main(String[] args) { 2 final String[] result = new String[1]; 3 ExecutorService executor = Executors.newSingleThreadExecutor(); 4 Future<String[]> future = executor.submit( 5 new Runnable() { 6 @Override 7 public void run() { 8 result[0] = "Hello World"; 9 } 10 }, result 11 ); 12 try { 13 System.out.print(future.get()[0]); 14 } catch (Exception e) { 15 e.printStackTrace(); 16 } finally { 17 executor.shutdown(); 18 } 19 }
1.2.3 無需返回值
使用Runnable作為入參:
1 Future<?> submit(Runnable task);
這種方式無需獲取返回值。
舉例如下:
1 public static void main(String[] args) { 2 ExecutorService executor = Executors.newSingleThreadExecutor(); 3 Future future = executor.submit( 4 new Runnable() { 5 @Override 6 public void run() { 7 } 8 } 9 ); 10 try { 11 System.out.print(future.get()); 12 } catch (Exception e) { 13 e.printStackTrace(); 14 } finally { 15 executor.shutdown(); 16 } 17 }
2 CompletableFuture
2.1 說明
雖然Future以及相關使用方法提供了非同步執行任務的能力,但是對於結果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務的結果。阻塞的方式顯然和非同步程式設計的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,而且也不能及時地得到計算結果。
在JDK1.8中,CompletableFuture提供了非常強大的Future的擴充套件功能,簡化了非同步程式設計的複雜性,並且提供了函數語言程式設計的能力,可以通過回撥的方式處理計算結果,也提供了轉換和組合CompletableFuture的方法。
CompletableFuture實現了CompletionStage介面和Future介面,CompletionStage代表非同步計算過程中的某一個階段,一個階段完成以後可能會觸發另外一個階段。一個階段的執行可能是被單個階段的完成觸發,也可能是由多個階段一起觸發。
2.2 使用
2.2.1 新建
CompletableFuture提供了四個靜態方法來建立一個非同步操作:
1 public static CompletableFuture<Void> runAsync(Runnable runnable) 2 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) 3 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 4 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync表示建立無返回值的非同步任務,相當於Runnable作為引數的submit方法:
1 public static void main(String[] args) { 2 CompletableFuture<Void> cf = CompletableFuture.runAsync(()->{ 3 System.out.println(Thread.currentThread().getName() + "-runAsync"); 4 }); 5 try { 6 System.out.println(Thread.currentThread().getName() + "-result-" + cf.get()); 7 } catch (Exception e) { 8 e.printStackTrace(); 9 } 10 }
結果如下:
1 ForkJoinPool.commonPool-worker-9-runAsync 2 main-result-null
supplyAsync表示建立帶返回值的非同步任務的,相當於Callable作為引數的submit方法:
1 public static void main(String[] args) { 2 CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{ 3 System.out.println(Thread.currentThread().getName() + "-supplyAsync"); 4 return "test"; 5 }); 6 try { 7 System.out.println(Thread.currentThread().getName() + "-result-" + cf.get()); 8 } catch (Exception e) { 9 e.printStackTrace(); 10 } 11 }
結果如下:
1 ForkJoinPool.commonPool-worker-9-supplyAsync 2 main-result-test
2.2.2 完成時回撥
當CompletableFuture的計算結果完成,可以執行特定的方法:
1 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) 2 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) 3 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
whenComplete會在當前執行的執行緒繼續執行CompletableFuture定義的任務:
1 public static void main(String[] args) { 2 CompletableFuture<Integer> future = new CompletableFuture<Integer>(); 3 new Thread(() -> { 4 System.out.println(Thread.currentThread().getName() + "-啟動"); 5 try { 6 Thread.sleep(1000); 7 } catch (Exception e) { 8 e.printStackTrace(); 9 } 10 System.out.println(Thread.currentThread().getName() + "-完成"); 11 future.complete(100); 12 System.out.println(Thread.currentThread().getName() + "-結束"); 13 }).start(); 14 future.whenComplete((t, u) -> { 15 try { 16 Thread.sleep(1000); 17 } catch (Exception e) { 18 e.printStackTrace(); 19 } 20 System.out.println(Thread.currentThread().getName() + "-t-" + t); 21 System.out.println(Thread.currentThread().getName() + "-u-" + u); 22 }); 23 try { 24 System.out.println(Thread.currentThread().getName() + "-return-" + future.get()); 25 } catch (Exception e) { 26 e.printStackTrace(); 27 } 28 }
執行結果:
1 Thread-0-啟動 2 //等待1s 3 Thread-0-完成 4 main-return-100 5 //等待1s 6 Thread-0-v-100 7 Thread-0-ex-null 8 Thread-0-結束
whenCompleteAsync會線上程池起一個新的執行緒執行CompletableFuture定義的任務:
1 public static void main(String[] args) { 2 CompletableFuture<Integer> future = new CompletableFuture<Integer>(); 3 new Thread(() -> { 4 System.out.println(Thread.currentThread().getName() + "-啟動"); 5 try { 6 Thread.sleep(1000); 7 } catch (Exception e) { 8 e.printStackTrace(); 9 } 10 System.out.println(Thread.currentThread().getName() + "-完成"); 11 future.complete(100); 12 System.out.println(Thread.currentThread().getName() + "-結束"); 13 }).start(); 14 future.whenCompleteAsync((t, u) -> { 15 try { 16 Thread.sleep(1000); 17 } catch (Exception e) { 18 e.printStackTrace(); 19 } 20 System.out.println(Thread.currentThread().getName() + "-t-" + t); 21 System.out.println(Thread.currentThread().getName() + "-u-" + u); 22 }); 23 try { 24 System.out.println(Thread.currentThread().getName() + "-return-" + future.get()); 25 // 主執行緒不要立刻結束,否則CompletableFuture預設使用的執行緒池會立刻關閉 26 Thread.sleep(2000); 27 } catch (Exception e) { 28 e.printStackTrace(); 29 } 30 }
執行結果:
1 Thread-0-啟動 2 //等待1s 3 Thread-0-完成 4 main-return-100 5 Thread-0-結束 6 //等待1s 7 ForkJoinPool.commonPool-worker-9-v-100 8 ForkJoinPool.commonPool-worker-9-ex-null 9 //等待1s
2.2.3 異常處理
當CompletableFuture的計算丟擲異常的時候,可以執行特定的方法:
1 public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
exceptionally會在丟擲異常的時候執行:
1 public static void main(String[] args) { 2 CompletableFuture<Integer> future = new CompletableFuture<Integer>(); 3 new Thread(() -> { 4 future.completeExceptionally(new NullPointerException()); 5 }).start(); 6 future.exceptionally(e -> { 7 e.printStackTrace(); 8 return 100; 9 }); 10 }
結果如下:
1 java.lang.NullPointerException
2.2.4 執行緒依賴
當一個執行緒依賴另一個執行緒時,可以使用方法來把這兩個執行緒序列化:
1 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) 2 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) 3 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
序列化執行:
1 public static void main(String[] args) { 2 CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{ 3 System.out.println(Thread.currentThread().getName() + "-supplyAsync"); 4 return "test"; 5 }).thenApply(t->{ 6 System.out.println(Thread.currentThread().getName() + "-t-" + t); 7 return t + "-apply"; 8 }); 9 try { 10 System.out.println(Thread.currentThread().getName() + "-" + future.get()); 11 } catch (Exception e) { 12 e.printStackTrace(); 13 } 14 }
結果如下:
1 ForkJoinPool.commonPool-worker-9-supplyAsync 2 ForkJoinPool.commonPool-worker-9-t-test 3 main-test-apply
2.2.5 消費處理
接收任務的處理結果,並消費處理,無返回結果:
1 public CompletionStage<Void> thenAccept(Consumer<? super T> action) 2 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action) 3 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
消費處理結果:
1 public static void main(String[] args) { 2 CompletableFuture<Void> future = CompletableFuture.supplyAsync(()->{ 3 System.out.println(Thread.currentThread().getName() + "-supplyAsync"); 4 return "test"; 5 }).thenAccept(t->{ 6 System.out.println(Thread.currentThread().getName() + "-t-" + t); 7 }); 8 try { 9 System.out.println(Thread.currentThread().getName() + "-" + future.get()); 10 } catch (Exception e) { 11 e.printStackTrace(); 12 } 13 }
結果如下:
1 ForkJoinPool.commonPool-worker-9-supplyAsync 2 main-t-test 3 main-null
2.2.6 取消返回
不接收任務的處理結果,取消返回:
1 public CompletionStage<Void> thenRun(Runnable action) 2 public CompletionStage<Void> thenRunAsync(Runnable action) 3 public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor)
取消返回:
1 public static void main(String[] args) { 2 CompletableFuture<Void> future = CompletableFuture.supplyAsync(()->{ 3 System.out.println(Thread.currentThread().getName() + "-supplyAsync"); 4 return "test"; 5 }).thenRun(()->{ 6 System.out.println(Thread.currentThread().getName() + "-t-null"); 7 }); 8 try { 9 System.out.println(Thread.currentThread().getName() + "-" + future.get()); 10 } catch (Exception e) { 11 e.printStackTrace(); 12 } 13 }
結果如下:
1 ForkJoinPool.commonPool-worker-9-supplyAsync 2 main-t-null 3 main-null