1. 程式人生 > >fork-join之CompletableFuture 使用總結

fork-join之CompletableFuture 使用總結

CompletableFuture類實現了CompletionStage和Future介面。Future是Java 5新增的類,用來描述一個非同步計算的結果,但是獲取一個結果時方法較少,要麼通過輪詢isDone,確認完成後,呼叫get()獲取值,要麼呼叫get()設定一個超時時間。但是這個get()方法會阻塞住呼叫執行緒,這種阻塞的方式顯然和我們的非同步程式設計的初衷相違背。
為了解決這個問題,JDK吸收了guava的設計思想,加入了Future的諸多擴充套件功能形成了CompletableFuture。CompletableFuture可以在某個執行緒完成某任務的時候回撥某個函式

CompletableFuture 主要API如下

變換操作

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

例如

public void thenApplay() {
        String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join();
        System.out.println(result);
}

執行結果為

hello world

消耗操作

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public
CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

示例

@Test
    public void thenAccept(){
        CompletableFuture.supplyAsync(()->"hello").thenAccept(s-> System.out.println(s+" world"));
    }

執行結果為

hello world

對上一步的計算結果不關心,執行下一個操作

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

示例

public void thenRun(){
    CompletableFuture.supplyAsync(()->{
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Hello World";
    }).thenRun(()-> System.out.println("Hello World"));
}

執行結果

Hello World

結合兩個CompletionStage經過轉化後返回

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

示例

public void thenCombine(){
        String result=CompletableFuture.supplyAsync(()->{
            return "hello ";
        }).thenCombine(CompletableFuture.supplyAsync(()->{
            return "world";
        }),(s1,s2)->s1+s2).join();
        System.out.println(result);
    }

執行結果

hello world

在兩個CompletionStage都執行完執行

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

示例

public void thenAfterRun() {
    CompletableFuture.supplyAsync(() -> "Hello")
            .runAfterBoth(CompletableFuture.supplyAsync(() -> " World"), () -> System.out.println("Hello World"));
}

執行結果

Hello World

呼叫優先執行完的結果進行處理

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

示例

public void applyToEither(){
        String result=CompletableFuture.supplyAsync(()->"hello").applyToEither(CompletableFuture.supplyAsync(()->" World"),s->s).join();
        System.out.println(result);
    }

執行結果

hello

呼叫優先執行完的結果進行消耗

這個API與上一個API優點像不同點在於上一個API有返回值

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);

示例

public void acceptEither(){
    CompletableFuture.supplyAsync(()->"Hello")
            .acceptEither(CompletableFuture.supplyAsync(()->"World"),System.out::println);
}

執行結果

Hello

只要有一個完成就執行下一步操作

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

示例

public void runAfterEigher() {
    CompletableFuture.supplyAsync(() -> {
        while (true) {
        }
    }).runAfterEither(CompletableFuture.supplyAsync(() -> "s2"), () -> System.out.println("finish"));

執行結果

finish

異常處理

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

示例

public void exceptionally() {
        String result = (String) CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("異常");
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }

結果為

java.lang.RuntimeException: 異常
hello world

記錄結果

執行完成時,對結果

public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);

示例

public void whenComplete() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (1 == 1) {
                throw new RuntimeException("測試一下異常情況");
            }
            return "s1";
        }).whenComplete((s, t) -> {
            System.out.println(s);
            System.out.println(t.getMessage());
        }).exceptionally(e -> {
            System.out.println(e.getMessage());
            return "hello world";
        }).join();
        System.out.println(result);
    }

執行完成時處理結果

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

示例

public void handle() {
        String result = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //出現異常
            if (1 == 1) {
                throw new RuntimeException("測試一下異常情況");
            }
            return "s1";
        }).handle((s, t) -> {
            if (t != null) {
                return "hello world";
            }
            return s;
        }).join();
        System.out.println(result);
    }

結果為

hello world