Java響應式程式設計之CompletableFuture學習(一)
本文主要是介紹CompletableFuture的基本使用方法,在後面一篇文章中會寫一個簡單的demo。
1.建立一個完成的CompletableFuture
這種情況通常我們會在計算的開始階段使用它。
CompletableFuture<String> cf = CompletableFuture.completedFuture("message"); // 建立一個預定義結果
assertTrue(cf.isDone()); // 是否完成
assertEquals("message", cf.getNow(null)); // 結果比對
cf.getNow
如果任務完成則獲得返回值,如果呼叫時未完成則返回設定的預設值(null)
2.執行同步任務
thenApply 該方法將返回一個新的CompletionStage,並且將操作函式的返回值作為泛型引數
long main_thread_id = Thread.currentThread().getId(); CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApply(s -> { assertEquals(main_thread_id, Thread.currentThread().getId()); assertFalse(Thread.currentThread().isDaemon()); return s.toUpperCase(); // 此處返回String 所以泛型為String }); assertEquals("MESSAGE", cf.getNow(null));
3.執行非同步任務
CompletableFuture的方法中 Async 字尾的均為非同步操作
CompletableFuture在非同步執行任務的時候,不指定Executor的情況下,非同步執行通過 ForkJoinPool 實現
非同步操作的時候,我們使用 join 來等待任務完成
long main_thread_id = Thread.currentThread().getId(); CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { assertNotEquals(main_thread_id, Thread.currentThread().getId()); //由於非同步了 所以與主執行緒id不一致 // 非同步執行通過ForkJoinPool實現, 它使用守護執行緒去執行任務 assertTrue(Thread.currentThread().isDaemon()); return s.toUpperCase(); }); assertNull(cf.getNow(null)); // 此時任務未完成返回null assertEquals("MESSAGE", cf.join()); // 通過join等待返回結果
4.使用Executor 來非同步執行任務
執行任務方法都可以使用指定的Executor來執行操作
ExecutorService executorService = Executors.newFixedThreadPool(3, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "HHH");
}
});
CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertEquals("HHH", Thread.currentThread().getName()); // 使用指定執行緒池內執行緒執行
assertFalse(Thread.currentThread().isDaemon()); // 此時非守護程序執行
return s.toUpperCase();
}, executorService); // 填入 指定執行緒池 executorService
assertNull(cf.getNow(null));
assertEquals("MESSAGE", cf.join()); // 通過join等待返回結果
5.消費遷移
下一階段接收了當前階段的結果,如果不需要使用返回值時,可以使用 thenAccept
StringBuffer result = new StringBuffer();
// 此時不存在返回結果 所以不需要用泛型
CompletableFuture cf = CompletableFuture.completedFuture("message").thenAcceptAsync(result::append);
assertEquals("此時result長度為0", 0, result.length());
cf.join(); // 非同步需要等待執行完成
assertEquals("此時result長度為7", 7, result.length());
6.異常處理
我們使用 handle 來定義出現以異常時的的處理方式,這個處理操作也是可以非同步的 handleAsync
CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
int a = 1 / 0; // 發生異常
return s;
});
// 如果在非同步的時候需要通知CompletableFuture 發生了異常 可以使用 cf.completeExceptionally(e)
// 如果發生異常 則返回 發生了異常
CompletableFuture exceptionHandler = cf.handle((s, th) -> (th != null) ? "發生了異常" : "");
try {
cf.join();
fail("此處不會被執行");
} catch (CompletionException ex) { // just for testing
assertEquals("/ by zero", ex.getCause().getMessage()); // 不能除以0異常
}
assertEquals("發生了異常", exceptionHandler.join());
whenComplete 操作需要接收兩個引數 ,第一個引數為返回值 第二個為異常資訊
CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
int a = 1 / 0; // 發生異常
return s;
}).whenComplete((v,th)->{
assertNotEquals(null,th); // 產生了異常
});
CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase).whenComplete((v,th)->{
assertNull(th); // 未產生異常
assertEquals("MESSAGE",v);
});
7.取消任務
通過呼叫 cancel 來取消任務 ,或者使用 completeExceptionally(new CancellationException())
CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
fail("任務被取消此段不執行");
return s;
});
CompletableFuture<String> cf2 = cf.exceptionally(e -> "取消任務");
cf.cancel(true); // 取消任務
//cf.completeExceptionally(new CancellationException()); // 效果等同
assertTrue(cf.isCompletedExceptionally()); // 異常中斷
assertEquals("取消任務", cf2.join());
8. applyToEither
applyToEither 只要有一個執行完成 則終止另外一個 如果同時執行完成,使用當前任務返回結果,而不是applyToEither方法中指定的任務的返回結果
String original = "Message";
CompletableFuture<String> cf1 = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> {
try {
Thread.sleep(1000); // 延時 測試效果
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("當前任務執行");
return s.toUpperCase();
});
/*
applyToEither
只要有一個執行完成 則終止另外一個 如果同時執行完成 使用當前返回結果
*/
CompletableFuture<String> cf2 = cf1.applyToEither(
CompletableFuture.completedFuture(original).thenApplyAsync(s -> {
System.out.println("指定任務執行");
return s.toLowerCase();
}),
s -> s + " from applyToEither");
String result = cf2.join();
System.out.println(result);
assertTrue(result.endsWith(" from applyToEither"));
9. acceptEither
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> {
System.out.println("當前任務執行");
return s.toUpperCase();
})
.acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> {
System.out.println("指定任務執行");
return s.toLowerCase();
}),
s -> result.append(s).append("from acceptEither"));
cf.join();
assertTrue(result.toString().endsWith("from acceptEither"));
10.runAfterBoth
runAfterBoth 無返回值 當前任務和指定任務都完成時,才執行指定操作。
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(s -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("當前任務執行");
return s.toUpperCase();
}).runAfterBoth(
CompletableFuture.completedFuture(original).thenApply(s -> {
System.out.println("指定任務執行");
return s.toLowerCase();
}),
() -> result.append("done"));
System.out.println(original);
assertTrue("Result was empty", result.length() > 0);
11.thenAcceptBoth
thenAcceptBoth 無返回值,該方法將兩個任務的返回值作為指定操作的引數
String original = "Message";
StringBuffer res = new StringBuffer();
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
(s, s2) -> {
res.append(s).append(s2);
}
);
assertEquals("MESSAGEmessage", res.toString());
12.thenCombine
thenCombine 有返回值 ,該方法將兩個任務的返回值作為指定操作的引數
String original = "Message";
CompletableFuture<String> res = CompletableFuture.completedFuture(original)
.thenApply(String::toUpperCase)
.thenCombine(
CompletableFuture.completedFuture(original)
.thenApply(String::toLowerCase),
(s, s2) -> s + s2);
assertEquals("MESSAGEmessage", res.getNow(null));
13.thenCompose
thenCompose 有返回值 ,該方法將當前任務的返回值 作為指定操作的引數
String original = "Message";
CompletableFuture<String> res = CompletableFuture.completedFuture(original)
.thenApply(String::toUpperCase)
.thenCompose(
s -> CompletableFuture.completedFuture(original)
.thenApply(String::toLowerCase)
.thenApply(s2 -> s + s2));
assertEquals("MESSAGEmessage", res.getNow(null));
14.anyOf
anyOf 有返回值,如果有一個任務完成(包括異常) 則整個任務完成
StringBuilder result = new StringBuilder();
List<String> messages = Arrays.asList("a", "b", "c");
CompletableFuture.anyOf(messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg)
.thenApply(String::toUpperCase))
.toArray(CompletableFuture[]::new))
.whenComplete((res, th) -> {
if (th == null) {
assertTrue(isUpperCase(((String) res).charAt(0)));
result.append(res);
}
});
assertEquals(result.length(), 1);
StringBuilder result_except = new StringBuilder();
CompletableFuture<?> cf = CompletableFuture.anyOf(messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg)
.thenApply(s -> {
if (s.equals("a")) {
int a = 1 / 0;
}
return s.toUpperCase();
}))
.toArray(CompletableFuture<?>[]::new))
.whenComplete((res, th) -> {
if (th == null) {
assertFalse(isUpperCase(((String) res).charAt(0)));
result_except.append(res);
}
});
CompletableFuture<String> exceptionHandler = cf.handle((s, th) -> (th != null) ? "發生了異常" : "");
assertEquals(result_except.length(), 0);
assertEquals(exceptionHandler.join(), "發生了異常");
15.allOf
allOf 所有任務完成在執行操作
- 如果所有任務為同步操作 那麼 allOf指定的操作也是同步的
List<String> messages = Arrays.asList("a", "b", "c");
StringBuilder result = new StringBuilder();
List<CompletableFuture<String>> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg)
.thenApply(String::toUpperCase))
.collect(Collectors.toList());
long main_id = Thread.currentThread().getId();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((v, th) -> {
assertEquals(main_id,Thread.currentThread().getId());
if (th == null) { // 未發生異常
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null).charAt(0))));
result.append("done");
}
});
assertEquals("done", result.toString());
- 如果所有任務為非同步操作 那麼 allOf指定的操作也是非同步的
List<String> messages = Arrays.asList("a", "b", "c");
StringBuilder result = new StringBuilder();
List<CompletableFuture<String>> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg)
.thenApplyAsync(s -> {
try {
Thread.sleep(1000); // 如果此處沒有延時 allof的完成操作將不是非同步的
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}))
.collect(Collectors.toList());
CompletableFuture cf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((v, th) -> {
System.out.println(Thread.currentThread().getName());
futures.forEach(f -> assertTrue(isUpperCase(f.getNow(null).charAt(0))));
result.append("done");
});
assertEquals(0, result.length());
cf.join();
assertEquals("done", result.toString());
樓主疑問
在測試 非同步的 allOf 的時候發現,JVM 會對任務是否非同步進行優化 。
如果執行任務時間短暫且當前執行緒不繁忙,那麼任務將會同步執行
下面貼上測試程式碼 不知道大家是否也是一樣的結果
- 任務無延時
// 此時任務無延時
public void test18() {
int count = 0; // 記錄非同步次數
for (int i = 0; i < 1000; i++) {
if (test_allof_no_dealy()) count++;
}
System.out.println("出現非同步次數:" + count);
}
public boolean test_allof_no_dealy() {
List<String> messages = Arrays.asList("a", "b", "c");
StringBuilder result = new StringBuilder();
List<CompletableFuture<String>> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg)
.thenApplyAsync(String::toUpperCase))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((v, th) -> {
assertFalse(Thread.currentThread().isDaemon()); // 主執行緒執行
futures.forEach(f -> assertTrue(isUpperCase(f.getNow(null).charAt(0))));
result.append("done");
});
if (result.length() == 0) return true; // 此時非同步
return false; // 同步
}
多次執行結果: 出現非同步次數:512 出現非同步次數:483
- 任務延時0.001s
// 此時任務延時0.001s
public void test19() {
int count = 0; // 記錄非同步次數
for (int i = 0; i < 1000; i++) {
if (test_allof_dealy()) count++;
}
System.out.println("出現非同步次數:" + count);
}
public boolean test_allof_dealy() {
List<String> messages = Arrays.asList("a", "b", "c");
StringBuilder result = new StringBuilder();
List<CompletableFuture<String>> futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg)
.thenApplyAsync(s -> {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}))
.collect(Collectors.toList());
CompletableFuture cf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((v, th) -> {
futures.forEach(f -> assertTrue(isUpperCase(f.getNow(null).charAt(0))));
result.append("done");
});
if (result.length() == 0) return true; // 此時非同步
return false; // 同步
}
多次執行結果: 出現非同步次數:1000 出現非同步次數:1000