1. 程式人生 > 實用技巧 >CompletableFuture詳解1

CompletableFuture詳解1

初識CompletableFuture

package com.dwz.executors;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.IntStream; public class CompletableFutureExample1 { private static void testCompletableFuture() throws InterruptedException { //執行的執行緒預設是守護執行緒,會隨著主執行緒的結束而結束 CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(
10); } catch (InterruptedException e) { e.printStackTrace(); } }).whenComplete((v, t) -> System.out.println("DONE.")); System.out.println("=========I am not blocked.========"); Thread.currentThread().join(); }
private static void display(int data) { int value = ThreadLocalRandom.current().nextInt(20); try { System.out.println(Thread.currentThread().getName() + " display will be sleep " + value); TimeUnit.SECONDS.sleep(value); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " display execute done " + data); } private static int get1() { int value = ThreadLocalRandom.current().nextInt(20); try { System.out.println(Thread.currentThread().getName() + " get will be sleep " + value); TimeUnit.SECONDS.sleep(value); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " get execute done " + value); return value; } //前後執行的get1()和display()有先後執行順序,必須等待get1()全部執行完 private static void testOld() throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); List<Callable<Integer>> tasks = IntStream.range(0, 10).boxed().map(i -> (Callable<Integer>)() -> get1()).collect(Collectors.toList()); executorService.invokeAll(tasks).stream().mapToInt(future -> { try { //阻塞 return future.get(); } catch (Exception e) { throw new RuntimeException(e); } }).parallel().forEach(CompletableFutureExample1::display); } //前後執行的get1()和display()完全並行,get1()執行完成之後就可以接著執行display() private static void testNew() throws InterruptedException { IntStream.range(0, 10).boxed() .forEach(i -> CompletableFuture.supplyAsync(CompletableFutureExample1::get1) .thenAccept(CompletableFutureExample1::display) .whenComplete((v, t) -> System.out.println(i + "DONE."))); Thread.currentThread().join(); } }

CompletableFuture方法大綱

CompletableFuture的方法總結:
1.factory method
*supplyAsync
*runAsync
*completedFuture
*anyOf
*allOf
2.intermate
2.1 return T
*whenComplete biconsumer
*whenCompleteAsync
*thenApply
*thenApplyAsync
*handleAsync biFunction
*handle
2.2 return void
*thenAcceptAsync
*thenAccept
*thenRunAsync
*thenRun
3.compose
* ThenAcceptBoth
* AcceptEither
* RunAfterBothAsync
* RunAfterEither
* Combine
* Compose
4.terminated
* getNow
* complete
* join
* completeExceptionally
* obtrudeException
* exceptionally

factory method 工廠方法

package com.dwz.executors;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class CompletableFutureExample2 {
    /*
     *     情景:
     *     在表裡面插入基本資訊和詳細資訊,最後再執行某些操作,插入的動作沒有先後順序
     *                 insert basic    
     *     【submit】                     ===>do action
     *                 insert details
     */
    private static void supplyAsync() {
        CompletableFuture.supplyAsync(Object::new).thenAcceptAsync(obj -> {
            try {
                System.out.println("Object===========Start");
                TimeUnit.SECONDS.sleep(5);
                System.out.println("Object===========" + obj);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).runAfterBoth(CompletableFuture.supplyAsync(() -> "Hello").thenAcceptAsync(s -> {
            try {
                System.out.println("String===========Start");
                TimeUnit.SECONDS.sleep(3);
                System.out.println("String===========" + s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }), () -> System.out.println("=========Finished========"));
    }
    
    private static Future<?> runAsync() {
        return CompletableFuture.runAsync(() -> {
            try {
                System.out.println("Object===========Start");
                TimeUnit.SECONDS.sleep(5);
                System.out.println("Object===========End");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).whenComplete((v, t) -> System.out.println("==========over========="));
    }
    
    private static Future<Void> completed(String data) {
        return CompletableFuture.completedFuture(data).thenAccept(System.out::println);
    }
    
    /*
     *     只返回級聯中的一個行為值,其他行為也會非同步正常執行結束
     */
    private static Future<?> anyOf() {
        return CompletableFuture.anyOf(CompletableFuture.runAsync(() -> {
            try {
                System.out.println("1===========Start");
                TimeUnit.SECONDS.sleep(5);
                System.out.println("1===========End");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).whenComplete((v, t) -> System.out.println("==========over=========")),
        CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("2===========Start");
                TimeUnit.SECONDS.sleep(4);
                System.out.println("2===========End");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello";
        }).whenComplete((v, t) -> System.out.println(v + "==========over=========")));
    }
    
    /*
     *     所有行為非同步正常執行結束,沒有返回值
     */
    private static void allOf() {
        CompletableFuture.allOf(CompletableFuture.runAsync(() -> {
            try {
                System.out.println("1===========Start");
                TimeUnit.SECONDS.sleep(5);
                System.out.println("1===========End");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).whenComplete((v, t) -> System.out.println("==========over=========")),
        CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("2===========Start");
                TimeUnit.SECONDS.sleep(4);
                System.out.println("2===========End");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello";
        }).whenComplete((v, t) -> System.out.println(v + "==========over=========")));
    }
    
    private static void create() {
        CompletableFuture<Object> future = new CompletableFuture();
        //等價於
        Object s = null;
        CompletableFuture.<Object>supplyAsync(() -> s);
    }
}

測試方法:

public static void main(String[] args) throws InterruptedException, ExecutionException {
        supplyAsync();
        runAsync();
        completed("dandan");
        System.out.println(">>>>>"+anyOf().get());
        allOf();
        Thread.currentThread().join();
    }

intermate return T orreturn void

package com.dwz.executors;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class CompletableFutureExample3 {
    //有返回值的方法
    private static void testWhenComplete() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
        future.whenComplete((v, t) -> {
            try {
                System.out.println("==============");
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("=======over=======");
        });
        System.out.println(future.get());
    }
    
    private static void testWhenCompleteAsync() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
        future.whenCompleteAsync((v, t) -> {
            try {
                System.out.println("==============");
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("=======over=======");
        });
        System.out.println(future.get());
    }
    
    //入參是function
    private static void testThenApply() throws InterruptedException, ExecutionException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> "Hello").thenApply(s -> {
            try {
                System.out.println("==============");
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("=======over=======");
            return s.length();
        });
        System.out.println(future.get());
    }
    
    private static void testThenApplyAsync() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<Integer> future2 = future.thenApplyAsync(s -> {
            try {
                System.out.println("==============");
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("=======over=======");
            return s.length();
        });
        System.out.println(future.get());
    }
    
    //對異常進行處理並返回新值
    private static void testHandle() throws InterruptedException, ExecutionException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync((Supplier<String>)() -> {
            throw new RuntimeException("not get the data");
        }).handle((s, t) -> {
            Optional.of(t).ifPresent(e -> System.out.println("Error"));
            return (s == null) ? 0 : s.length();
        });
        System.out.println(future.get());
    }
    
    private static void testHandleAsync() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync((Supplier<String>)() -> {
            throw new RuntimeException("not get the data");
        });
        CompletableFuture<Integer> future2 = future.handleAsync((s, t) -> {
            Optional.of(t).ifPresent(e -> System.out.println("Error"));
            return (s == null) ? 0 : s.length();
        });
        System.out.println(future2.get());
    }
    
    //沒有返回值的方法
    private static void thenAcceptAsync() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
        future.thenAcceptAsync(System.out::println);
        System.out.println(future.get().length());
    }
    
    private static void thenRunAsync() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
        future.thenRunAsync(() -> System.out.println("done"));
        System.out.println(future.get());
    }
}

測試方法:

public static void main(String[] args) throws InterruptedException, ExecutionException {
        testWhenComplete();
        testWhenCompleteAsync();
        testThenApply();
        testThenApplyAsync();
        testHandle();
        testHandleAsync();
        thenAcceptAsync();
        thenRunAsync();
        Thread.currentThread().join();
        
    }

compose 組合、聚合方法

package com.dwz.executors;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class CompletableFutureExample4 {
    
    private static void sleep(long seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    //兩個CompletableFuture同時執行
    private static void testThenAcceptBoth() {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("start the supplyAsync");
            sleep(5);
            System.out.println("end the supplyAsync");
            return "thenAcceptBoth";
        }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
            System.out.println("start the thenAcceptBoth");
            sleep(5);
            System.out.println("end the thenAcceptBoth");
            return 100;
        }), (s, i) -> System.out.println(s + "--" + i));
    }
    
    //兩個CompletableFuture同時執行,返回其中任意一個的值
    private static void testAcceptEither() {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("start the supplyAsync");
            sleep(5);
            System.out.println("end the supplyAsync");
            return "acceptEither-1";
        }).acceptEither(CompletableFuture.supplyAsync(() -> {
            System.out.println("start the acceptEither");
            sleep(5);
            System.out.println("end the acceptEither");
            return "acceptEither-2";
        }), System.out::println);
    }
    
    //兩個CompletableFuture非同步執行,action不接收前面CompletableFuture的引數
    private static void testRunAfterBothAsync() {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("start the supplyAsync");
            sleep(5);
            System.out.println("end the supplyAsync");
            return "runAfterBoth-1";
        }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
            System.out.println("start the runAfterBoth");
            sleep(3);
            System.out.println("end the runAfterBoth");
            return 100;
        }), () -> System.out.println("DONE"));
    }
    
    //兩個CompletableFuture任意一個執行完就開始執行action,action不關心前面CompletableFuture的引數
    private static void testRunAfterEither() {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("start the supplyAsync");
            sleep(5);
            System.out.println("end the supplyAsync");
            return "runAfterEither-1";
        }).runAfterEither(CompletableFuture.supplyAsync(() -> {
            System.out.println("start the runAfterEither");
            sleep(6);
            System.out.println("end the runAfterEither");
            return 100;
        }), () -> System.out.println("DONE"));
    }
    
    //thenCombine跟thenAcceptBoth的區別:
    //thenCombine執行完之後還可以繼續往下級聯,thenAcceptBoth執行完之後返回的是void不可以繼續級聯
    private static void testCombine() {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("start the supplyAsync");
            sleep(5);
            System.out.println("end the supplyAsync");
            return "Combine-1";
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println("start the thenCombine");
            sleep(5);
            System.out.println("end the thenCombine");
            return 100;
        }), (s, i) -> s.length() > i).whenComplete((v, t) -> System.out.println(v));
    }
    
    //前一個CompletableFuture的輸出作為後一個CompletableFuture的輸入,執行完之後還可以繼續級聯
    private static void testCompose() {
        CompletableFuture.supplyAsync(() -> {
            System.out.println("start the supplyAsync");
            sleep(5);
            System.out.println("end the supplyAsync");
            return "Compose-1";
        }).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
            System.out.println("start the thenCompose");
            sleep(3);
            System.out.println("end the thenCompose");
            return s.length();
        })).thenAccept(System.out::println);
    }
}

測試方法:

    public static void main(String[] args) throws InterruptedException {
        testThenAcceptBoth();
        testAcceptEither();
        testRunAfterBothAsync();
        testRunAfterEither();
        testCombine();
        testCompose();
        Thread.currentThread().join();
    }

terminated 立即結束

package com.dwz.executors;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class CompletableFutureExample5 {
    
    private static void sleep(long seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    private static void getNow() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            sleep(3);
            return "Hello";
        });
        sleep(1);
        String result = future.getNow("World");
        System.out.println(result);
        System.out.println(future.get());
    }
    
    private static void complete() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            sleep(3);
            System.out.println("--------I will be still process.");
            return "Hello";
        });
        sleep(1);
        boolean finished = future.complete("World");
        System.out.println(finished);
        System.out.println(future.get());
    }
    
    private static void join() {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            sleep(3);
            System.out.println("--------I will be still process.");
            return "Hello";
        });
        String result = future.join();
        System.out.println(result);
    }
    
    private static void completeExceptionally() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            sleep(3);
            System.out.println("--------I will be still process.");
            return "Hello";
        });
        sleep(5);
        //CompletableFuture沒有執行完,快速失敗
        //CompletableFuture正常執行完,不丟擲異常
        future.completeExceptionally(new RuntimeException());
        System.out.println(future.get());
    }
    
    private static void obtrudeException() throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            sleep(3);
            System.out.println("--------I will be still process.");
            return "Hello";
        });
        //不關心CompletableFuture的值,直接丟擲異常
        future.obtrudeException(new Exception("I am error."));
        System.out.println(future.get());
    }
    
    //對錯誤的處理與業務分開
    private static CompletableFuture<String> errorHandle() {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            sleep(3);
            System.out.println("--------I will be still process.");
            return "Hello";
        });
        
        future1.thenApply(s -> {
            Integer.parseInt(s);
            System.out.println("===========keep move============");
            return s + " WORLD";
        }).exceptionally(Throwable::getMessage).thenAccept(System.out::println);
        
        return future1;
    }
}

測試方法:

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        getNow();
        complete();
        join();
        completeExceptionally();
        obtrudeException();
        System.out.println(errorHandle().get());
        Thread.currentThread().join();
    }