1. 程式人生 > 其它 >多執行緒的實現、CompletableFuture非同步任務、@Async註解非同步呼叫

多執行緒的實現、CompletableFuture非同步任務、@Async註解非同步呼叫

技術標籤:非同步任務多執行緒futurejava

多執行緒的實現、CompletableFuture非同步任務、@Async註解非同步呼叫

java

一、非同步和多執行緒有什麼區別?

其實,非同步和多執行緒並不時一個同等關係,非同步是目的,多執行緒只是我們實現非同步的一個手段.

什麼是非同步?

非同步是當一個呼叫請求傳送給被呼叫者,而呼叫者不用等待其結果的返回.實現非同步可以採用多執行緒技術或則交給另外的程序來處理

多執行緒和非同步操作兩者都可以達到避免呼叫執行緒阻塞的目的,從而提高軟體的可響應性。甚至有些時候我們就認為多執行緒和非同步操作是等同的概念。但是,多執行緒和非同步操作還是有一些區別的。而這些區別造成了使用多執行緒和非同步操作的時機的區別。

非同步和多執行緒是兩個不同的概念,不能這樣比較.非同步請求一般用在IO等耗時操作上,他的好處是函式呼叫立即返回,相應的工作執行緒立即返還給系統以供重用。由於系統的執行緒資源是非常寶貴的,通常有一定的數目限制,如.net預設是25。若使用非同步方式,用這些固定數目的執行緒在固定的時間內就可以服務更多的請求,而如果用同步方式,那麼每個請求都自始至終佔用這一個執行緒,伺服器可以同時服務的請求數就少了。當非同步操作執行完成後,系統會從可用執行緒中選取一個執行回撥程式,這時的這個執行緒可能是剛開始發出請求的那個執行緒,也可能是其他的執行緒,因為系統選取執行緒是隨機的事情,所以不能說絕對不是剛開始的那個執行緒。多執行緒是用來併發的執行多個任務。

不過有個問題,非同步有時優先順序比主執行緒還高。這個特點和多執行緒不同。

詳見:https://blog.csdn.net/qq_36936155/article/details/78991050

二、多執行緒方式

java多執行緒實現方式主要有三種:繼承Thread類、實現Runnable介面、使用ExecutorServiceCallableFuture實現有返回結果的多執行緒。

其中前兩種方式執行緒執行完後都沒有返回值,只有最後一種是帶返回值的。

1、繼承Thread類實現多執行緒

繼承Thread類的方法儘管被我列為一種多執行緒實現方式,但Thread本質上也是實現了Runnable介面的一個例項,它代表一個執行緒的例項,並且,啟動執行緒的唯一方法就是通過Thread類的start()例項方法。start()方法是一個native方法,它將啟動一個新執行緒,並執行run()方法。這種方式實現多執行緒很簡單,通過自己的類直接extend Thread,並複寫run()方法,就可以啟動新執行緒並執行自己定義的run()方法。例如:

 public class MyThread extends Thread {
     @Override
     public void run() {
         System.out.println(Thread.currentThread().getName() + ": 使用thread初始化了一個執行緒");
     }
 }

在合適的地方啟動執行緒:

// 啟動MyThread執行緒
for (int i = 0; i < 10; i++) {
    new MyThread().start();
}

2、實現Runnable介面方式實現多執行緒

如果自己的類已經extends另一個類,就無法直接extends Thread,此時,必須實現一個Runnable介面,如下:

public class MyRunnable extends OtherClass implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + ": 使用runnable初始化一個執行緒");
    }
}

為了啟動MyThread,需要首先例項化一個Thread,並傳入自己的MyRunnable例項

 for (int i = 0; i < 10; i++) {
     new Thread(new MyRunnable()).start();
 }

匿名內部類的方式

 for (int i = 0; i < 10; i++) {
     new Thread(new Runnable() {
         @Override
         public void run() {
             System.out.println(Thread.currentThread().getName() + ": 使用runnable初始化一個執行緒");
         }
     }).start();
 }
 // Thread本質上也是實現了Runnable介面的一個例項(匿名內部類簡化)
 for (int i = 0; i < 10; i++) {
     new Thread(() -> {
         System.out.println(Thread.currentThread().getName() + ": 使用runnable匿名內部類初始化一個執行緒");
     }).start();
 }

3、實現Callable介面通過FutureTask包裝器來建立Thread執行緒

public class MyCallable implements Callable<String> {
    @Override
    public String call() {
        System.out.println(Thread.currentThread().getName() + ": 使用Callable初始化一個執行緒");
        return "zhangsan";
    }
}

呼叫返回Future物件的get()方法,從Future物件上獲取任務的返回值,會阻塞直到計算完成。

不管是異常還是正常,只要執行完畢了,isDone()方法結果一樣是true

 for (int i = 0; i < 10; i++) {
     FutureTask<String> futureTask = new FutureTask<>(new MyCallable());
     new Thread(futureTask).start();
     //System.out.println(futureTask.get());  // 阻塞
     while (!futureTask.isDone()) { // 輪詢
         System.out.println("有結果了嗎?");
     }
     System.out.println("對方同意了!");
     System.in.read();
 }

4、使用ExecutorService、Callable、Future實現有返回結果的多執行緒

ExecutorService、Callable、Future這個物件實際上都是屬於Executor框架中的功能類。

**ExecutorService提供了submit()方法,傳遞一個Callable,或Runnable,返回Future。**如果Executor後臺執行緒池還沒有完成Callable的計算,這呼叫返回Future物件的get()方法,會阻塞直到計算完成。

 // 建立固定數目執行緒的執行緒池
 ExecutorService executorService = Executors.newFixedThreadPool(3);
 for (int i = 0; i < 10; i++) {
     executorService.submit(() -> {
         System.out.println(Thread.currentThread().getName() + ": 執行緒池執行任務!");
     });
 }
 // 如果Executor後臺執行緒池還沒有完成Callable的計算,這呼叫返回Future物件的get()方法,會阻塞直到計算完成。
 for (int i = 0; i < 10; i++) {
     Future<String> submit = executorService.submit(new MyCallable());
     System.out.println(submit.get().toString());
 }
 // 關閉執行緒池
 executorService.shutdown();

5、通過執行緒池建立執行緒

避免使用Executors建立執行緒池主要是為了避免其中的預設實現,可以改用ThreadPoolExecutor構造方法指定引數即可。

需要指定核心執行緒池的大小、最大執行緒池的數量、保持存活的時間、等待佇列容量的大小。在這種情況下一旦提交的執行緒數超過當前可用的執行緒數時就會丟擲拒絕執行的異常java.util.concurrent.RejectedExecutionException 有界佇列已經滿了便無法處理新的任務。

上述程式碼中Executors類,提供了一系列工廠方法用於建立執行緒池,返回的執行緒池都實現了ExecutorService介面。

建立固定數目執行緒的執行緒池。
public static ExecutorService newFixedThreadPool(int nThreads)
建立一個可快取的執行緒池,呼叫execute 將重用以前構造的執行緒(如果執行緒可用)。如果現有執行緒沒有可用的,則建立一個新執行緒並新增到池中。終止並從快取中移除那些已有 60 秒鐘未被使用的執行緒。
public static ExecutorService newCachedThreadPool()
建立一個單執行緒化的Executor。
public static ExecutorService newSingleThreadExecutor()
建立一個支援定時及週期性的任務執行的執行緒池,多數情況下可用來替代Timer類。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

 long start = System.currentTimeMillis();
 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(500));
 for (int i = 0; i < 100; i++) {
     threadPoolExecutor.execute(() -> {
         try {
             Thread.sleep(500);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         System.out.println(Thread.currentThread().getName() + ": 自定義執行緒池執行任務");
     });
 }
 // 關閉執行緒池 - 執行後停止接受新任務,會把佇列的任務執行完畢。
 threadPoolExecutor.shutdown();
 // 關閉執行緒池 - 也是停止接受新任務,但會中斷所有的任務,將執行緒池狀態變為 stop。
 //threadPoolExecutor.shutdownNow();
 // 會每隔一秒鐘檢查一次是否執行完畢(狀態為 TERMINATED),當從 while 迴圈退出時就表明執行緒池已經完全終止了。
 while (!threadPoolExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
     LOGGER.info("執行緒還在執行。。。");
 }
 long end = System.currentTimeMillis();
 LOGGER.info("一共處理了【{}】", (end - start));
使用工具類來建立執行緒池:

除了自己定義的ThreadPool之外,還可以使用開源庫apache guava等。

個人推薦使用guavaThreadFactoryBuilder() 來建立執行緒池:

 /**
  * ThreadFactory 為執行緒池建立的執行緒命名
  */
 private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

 public static void main(String[] args) {
     // 執行緒池建立 指定屬性
     ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5, 60, TimeUnit.SECONDS,
             new ArrayBlockingQueue<Runnable>(100), threadFactory, new ThreadPoolExecutor.AbortPolicy());
     for (int i = 0; i < 10; i++) {
         pool.execute(() -> System.out.println("測試一下guava命名的執行緒:" + Thread.currentThread().getName()));
     }
 }

使用上面的方法建立執行緒池不僅可以避免OOM的問題,還可以自定義執行緒名稱,更加方便出錯時溯源。

6、定時任務

開發中,往往遇到另起執行緒執行其他程式碼的情況,用java定時任務介面ScheduledExecutorService來實現。

ScheduledExecutorService是基於執行緒池設計的定時任務類,每個排程任務都會分配到執行緒池中的一個執行緒去執行,也就是說,任務是併發執行,互不影響。

注意,只有當排程任務來的時候,ScheduledExecutorService才會真正啟動一個執行緒,其餘時間ScheduledExecutorService都是處於輪詢任務的狀態。

// 建立一個支援定時及週期性的任務執行的執行緒池,多數情況下可用來替代Timer類。
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);

// scheduleAtFixedRate() 每次執行時間為上一次任務開始起向後推一個時間間隔,是基於固定時間間隔進行任務排程
scheduledExecutorService.scheduleAtFixedRate(() -> {
    System.out.println(Thread.currentThread().getName() + ": 定時執行任務!" + new Date());
}, 5, 10, TimeUnit.SECONDS);

// scheduleWithFixedDelay() 每次執行時間為上一次任務結束起向後推一個時間間隔,取決於每次任務執行的時間長短
scheduledExecutorService.scheduleWithFixedDelay(() -> {
    System.out.println(Thread.currentThread().getName() + ": 定時執行任務!" + new Date());
}, 5, 10, TimeUnit.SECONDS);

 // 只執行一次延時任務
 ScheduledExecutorService scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
         new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
 scheduledThreadPoolExecutor.schedule(() -> {
     System.out.println(Thread.currentThread().getName() + ": 定時執行任務!");
 }, 20, TimeUnit.SECONDS);

ScheduleAtFixedRate每次執行時間為上一次任務開始起向後推一個時間間隔,即每次執行時間為initialDelay,initialDelay+period,initialDelay+2*period。。。。。

ScheduleWithFixedDelay每次執行時間為上一次任務結束起向後推一個時間間隔,即每次執行時間為:initialDelay,initialDelay+executeTime+delay,initialDelay+2*executeTime+2*delay。。。。。

由此可見,ScheduleAtFixedRate是基於固定時間間隔進行任務排程,ScheduleWithFixedDelay取決於每次任務執行的時間長短,是基於不固定時間間隔進行任務排程

三、用CompletableFuture實現非同步任務

CompletableFutureFuture API的擴充套件。

Future 被用於作為一個非同步計算結果的引用。提供一個 isDone() 方法來檢查計算任務是否完成。當任務完成時,get() 方法用來接收計算任務的結果。

專案需求:

專案中需要優化一個介面,這個介面需要拉取2,3個第三方介面,需求延遲時間小於200ms;

技術選型:

在Java中CompletableFuture用於非同步程式設計,非同步程式設計是編寫非阻塞的程式碼,執行的任務在一個單獨的執行緒,與主執行緒隔離,並且會通知主執行緒它的進度,成功或者失敗。

在這種方式中,主執行緒不會被阻塞,不需要一直等到子執行緒完成。主執行緒可以並行的執行其他任務。使用這種並行方式,可以極大的提高程式的效能。

CompletableFutureJDK8提出的一個支援非阻塞的多功能的Future,同樣也是實現了Future介面,FutureJava 5新增的類,用來描述一個非同步計算的結果。java8對future進一步完善,擴充套件了諸多功能形成了CompletableFuture,它擁有Future所有的功能,包括獲取非同步執行結果,取消正在執行的任務等。

1、CompletableFuture功能介紹

CompletableFuture還是一個CompletionStage

我們看下CompletableFuture的定義:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

什麼是CompletionStage呢?

在非同步程式中,如果將每次的非同步執行都看成是一個stage的話,我們通常很難控制非同步程式的執行順序,在javascript中,我們需要在回撥中執行回撥。這就會形成傳說中的回撥地獄。

好在在ES6中引入了promise的概念,可以將回調中的回撥轉寫為鏈式呼叫,從而大大的提升了程式的可讀性和可寫性。

同樣的在java中,我們使用CompletionStage來實現非同步呼叫的鏈式操作。CompletionStage定義了一系列的then*** 操作來實現這一功能。

詳見:https://segmentfault.com/a/1190000022197398

2、使用CompletableFuture作為Future實現

使用無構參建構函式建立此類的例項

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

這是一個最簡單的 CompletableFuture,想獲取CompletableFuture 的結果可以使用 CompletableFuture.get() 方法:

String result = completableFuture.get()

get() 方法會一直阻塞直到 Future 完成。因此,以上的呼叫將被永遠阻塞,因為該Future一直不會完成。

另請注意,get方法丟擲一些已檢查的異常,即ExecutionException(封裝計算期間發生的異常)和InterruptedException(表示執行方法的執行緒被中斷的異常)

你可以使用 CompletableFuture.complete() 手工的完成一個 Future:

completableFuture.complete("Future's Result")

所有等待這個 Future 的客戶端都將得到一個指定的結果,並且 completableFuture.complete() 之後的呼叫將被忽略。

public Future<String> calculateAsync() {

    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.complete("Hello");
        return null;
    });

    return completableFuture;
}

這種建立和完成CompletableFuture的方法可以與任何併發包(包括原始執行緒)一起使用。

如果你知道執行的結果,那麼可以使用CompletableFuturecompletedFuture方法來直接返回一個Future。

public Future<String> useCompletableFuture(){
    Future<String> completableFuture =
            CompletableFuture.completedFuture("Hello");
    return completableFuture;
}

假設我們沒有找到結果並決定完全取消非同步執行任務。CompletableFuture還提供了一個cancel方法來立馬取消任務的執行。

public Future<String> calculateAsyncWithCancellation() {
    
    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.cancel(false);
        return null;
    });
    
    return completableFuture;
}

當我們使用Future.get()方法阻塞結果時,cancel()表示取消執行,它將丟擲CancellationException異常。java.util.concurrent.CancellationException

3、非同步執行

上面的程式碼很簡單,下面介紹幾個 static 方法,它們使用任務來例項化一個 CompletableFuture 例項。

CompletableFuture提供了runAsyncsupplyAsync的方法,可以以非同步的方式執行程式碼。

CompletableFuture.runAsync(Runnable runnable);
CompletableFuture.runAsync(Runnable runnable, Executor executor);

CompletableFuture.supplyAsync(Supplier<U> supplier);
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)

這兩個方法是executor 的升級,表示讓任務在指定的執行緒池中執行,不指定的話,通常任務是在 ForkJoinPool.commonPool() 執行緒池中執行的

3.1、runAsync()

方法接收的是 Runnable 的例項,但是它沒有返回值

public void runAsync() {
    CompletableFuture.runAsync(() -> {
        LOGGER.info("初始化CompletableFuture子任務! runAsync");
    }, Executors.newFixedThreadPool(3));
}

3.2、supplyAsync()

方法是JDK8函式式介面,無引數,會返回一個結果

public void supplyAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            LOGGER.info("初始化CompletableFuture子任務! supplyAsync");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "Result of the asynchronous computation";
        }
    });

    String result = future.get();
    LOGGER.info(result);
}

使用lambda表示式使得上面的示例更加簡明:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    LOGGER.info("初始化CompletableFuture子任務! supplyAsync");
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
});

4、轉換和執行

對於構建非同步系統,我們應該附上一個回撥給CompletableFuture,當Future完成的時候,自動的獲取結果。如果我們不想等待結果返回,我們可以把需要等待Future完成執行的邏輯寫入到回撥函式中。可以使用thenApplyAsync(), thenAccept()thenRun()方法附上一個回撥給CompletableFuture

為了控制執行回撥任務的執行緒,你可以使用非同步回撥。將從ForkJoinPool.commonPool()獲取不同的執行緒執行。此外,如果你傳入一個ExecutorthenApplyAsync()回撥中,,任務將從Executor執行緒池獲取一個執行緒執行。

4.1、 thenApplyAsync()

在兩個任務任務A,任務B中,任務B想要任務A計算的結果,可以用thenApplyAsync方法來接受一個函式例項,用它來處理結果,並返回一個Future函式的返回值:

模板

CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");

任務 A 執行完執行 B,B 需要 A 的結果,同時任務 B 有返回值。

多個任務的情況下,如果任務 B 後面還有任務 C,往下繼續呼叫.thenApplyAsync()即可。

實戰

public void supplyAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("初始化CompletableFuture子任務! supplyAsync");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    });
    
    CompletableFuture<String> completableFuture = future.thenApplyAsync(resultA -> {
        LOGGER.info(resultA);
        return "Hello " + resultA;
    }).thenApplyAsync(resultB -> {
        LOGGER.info(resultB);
        return resultB + ", Welcome to the arjun Blog";
    });

    System.out.println(completableFuture.get());
    // Prints - Hello Result of the asynchronous computation, Welcome to the arjun Blog
}

4.2、thenAcceptAsync()

在兩個任務任務A,任務B中,如果你不需要在Future中有返回值,則可以用 thenAcceptAsync方法接收將計算結果傳遞給它。最後的future.get()呼叫返回Void型別的例項。CompletableFuture.thenAcceptAsync()持有一個Consumer<T>,返回一個CompletableFuture<Void>。它可以訪問CompletableFuture的結果:

模板

CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); 

CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
  • runAsync不會有返回值,方法thenAcceptAsync,接收到的resultA值為null,同時任務B也不會有返回結果

  • supplyAsync有返回值,同時任務B不會有返回結果。

實戰

public void supplyAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("初始化CompletableFuture子任務! supplyAsync");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    });
    
    CompletableFuture<Void> completableFuture = future.thenAcceptAsync(resultA -> {
        LOGGER.info("Computation returned: {}", resultA);
    });

    System.out.println(completableFuture.get());
    // Prints - null
}

4.3、thenRunAsync()

如果你不想從你的回撥函式中返回任何東西,僅僅想在Future完成後執行一些程式碼片段,你可以使用thenAcceptAsync()thenRunAsync()方法,這些方法經常在呼叫鏈的最末端的最後一個回撥函式中使用。thenRun()不能訪Future的結果,它持有一個Runnable返回CompletableFuture<Void>

模板

CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); 
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
  • runAsync不會有返回值,方法thenRunAsync(Runnable runnable),任務 A 執行完執行 B,並且 B 不需要 A 的結果。

  • supplyAsync有返回值,方法thenRunAsync(Runnable runnable),任務 A 執行完執行 B,會返回resultA,但是 B 不需要 A 的結果。

實戰

public void supplyAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("初始化CompletableFuture子任務! supplyAsync");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    });
    
    CompletableFuture<Void> completableFuture = future.thenRunAsync(() -> System.out.println("Computation finished."));
    // Prints - Computation finished.
    System.out.println(completableFuture.get());
    // Prints - null
}

5、組合Futures

上面講到CompletableFuture的一個重大作用就是將回調改為鏈式呼叫,從而將Futures組合起來。

5.1. 使用 thenCompose()組合兩個獨立的future

thenCompose將前一個Future的返回結果作為後一個操作的輸入。

模板

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
        .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

實戰

假設你想從一個遠端API中獲取一個使用者的詳細資訊,一旦使用者資訊可用,你想從另外一個服務中獲取他的貸方。

考慮下以下兩個方法getUserDetail()getCreditRating()的實現:

CompletableFuture<User> getUsersDetail(String userId) {
    return CompletableFuture.supplyAsync(() -> {
        UserService.getUserDetails(userId);
    });    
}

CompletableFuture<Double> getCreditRating(User user) {
    return CompletableFuture.supplyAsync(() -> {
        CreditRatingService.getCreditRating(user);
    });
}

現在讓我們弄明白當使用了thenApply()後是否會達到我們期望的結果?

 CompletableFuture<CompletableFuture<Double>> result = getUsersDetail(userId)
         .thenApply(user -> getCreditRating(user));

在以上thenApply的示例中,Supplier函式傳入thenApply將返回一個簡單的值,但是在本例中,將返回一個CompletableFuture。以上示例的最終結果是一個巢狀的CompletableFuture
如果你想獲取最終的結果給最頂層future,使用 thenCompose()方法代替

CompletableFuture<Double> result = getUsersDetail(userId)
        .thenCompose(user -> getCreditRating(user));

因此,規則就是-如果你的回撥函式返回一個CompletableFuture,但是你想從CompletableFuture鏈中獲取一個直接合並後的結果,這時候你可以使用thenCompose()。因此,如果想要繼續巢狀連結CompletableFuture 方法,那麼最好使用thenCompose()

thenApply()thenCompose()之間的區別

thenCompose()方法類似於thenApply()在都返回一個新的計算結果。但是,thenCompose()使用前一個Future作為引數。它會直接使結果變新的Future,而不是我們在thenApply()中到的巢狀Future,而是用來連線兩個CompletableFuture,是生成一個新的CompletableFuture

5.2、使用thenCombine()組合兩個獨立的 future

如果要執行兩個獨立的任務,並對其結果執行某些操作,可以用Future的thenCombine方法。被用來當兩個獨立的Future都完成的時候,用來做一些事情。

模板

CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");

cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});

cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");

實戰

System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 65.0;
});

System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 177.8;
});

System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
        .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
    Double heightInMeter = heightInCm/100;
    return weightInKg/(heightInMeter*heightInMeter);
});

System.out.println("Your BMI is - " + combinedFuture.get());

當兩個Future都完成的時候,傳給thenCombine()的回撥函式將被呼叫。

5.3、使用thenAcceptBoth()組合兩個獨立的 future

更簡單的情況是,當你想要使用兩個Future結果時,但不需要將任何結果值進行返回時,可以用thenAcceptBoth,它表示後續的處理不需要返回值,而 thenCombine 表示需要返回值:

如果你不想返回結果,則可以使用thenAcceptBoth

CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
        .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> System.out.println(s1 + s2));

6、並行執行多個任務

我們使用thenCompose()thenCombine()把兩個CompletableFuture組合在一起。當我們需要並行執行多個任務時,我們通常希望等待所有它們執行,然後處理它們的組合結果。現在如果你想組合任意數量的CompletableFuture,應該怎麼做?

我們可以使用以下兩個方法組合任意數量的CompletableFuture

API

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...}
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs){...}

6.1、CompletableFuture.allOf()

CompletableFuture.allOf靜態方法允許等待所有的完成任務:

CompletableFuture.allOf的使用場景是當你一個列表的獨立future,並且你想在它們都完成後並行的做一些事情。

實戰場景

假設你想下載一個網站的100個不同的頁面。你可以序列的做這個操作,但是這非常消耗時間。因此你想寫一個函式,傳入一個頁面連結,返回一個CompletableFuture,非同步的下載頁面內容。

CompletableFuture<String> downloadWebPage(String pageLink) {
    return CompletableFuture.supplyAsync(() -> {
        // TODO Code to download and return the web page's content
    });
} 

現在,當所有的頁面已經下載完畢,你想計算包含關鍵字CompletableFuture頁面的數量。可以使用CompletableFuture.allOf()達成目的。

List<String> webPageLinks = Arrays.asList(...)    // A list of 100 web page links

// Download contents of all the web pages asynchronously
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
        .map(webPageLink -> downloadWebPage(webPageLink))
        .collect(Collectors.toList());

// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
        pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);

使用CompletableFuture.allOf()的問題是它返回CompletableFuture<Void>。這種方法的侷限性在於它不會返回所有任務的綜合結果。相反,你必須手動從Futures獲取結果。幸運的是,CompletableFuture.join()方法和Java 8 Streams API可以解決:

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
   return pageContentFutures.stream()
           //.map(pageContentFuture -> pageContentFuture.join())
       	   .map(CompletableFuture::join)
           .collect(Collectors.toList());
});

CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一樣的,都是阻塞獲取值,它們的區別在於 join() 丟擲的是 unchecked Exception。這使得它可以在Stream.map()方法中用作方法引用。

現在讓我們計算包含關鍵字頁面的數量。

 // Count the number of web pages having the "CompletableFuture" keyword.
 CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
     return pageContents.stream()
             .filter(pageContent -> pageContent.contains("CompletableFuture"))
             .count();
 });

 System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());

6.2、CompletableFuture.anyOf()

CompletableFuture.anyOf()和其名字介紹的一樣,當任何一個CompletableFuture完成的時候【相同的結果型別】,返回一個新的CompletableFuture

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 3";
});

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); 
// Prints - Result of Future 2

在以上示例中,當三個中的任何一個CompletableFuture完成, anyOfFuture就會完成。因為future2的休眠時間最少,因此它最先完成,最終的結果將是future2的結果。

CompletableFuture.anyOf()傳入一個Future可變引數,返回CompletableFuture<Object>CompletableFuture.anyOf()的問題是如果你的CompletableFuture返回的結果是不同型別的,這時候你講會不知道你最終CompletableFuture是什麼型別。

7、異常處理

我們探尋了怎樣建立CompletableFuture,轉換它們,並組合多個CompletableFuture。現在讓我們弄明白當發生錯誤的時候我們應該怎麼做。

首先讓我們明白在一個回撥鏈中錯誤是怎麼傳遞的。思考下以下回調鏈:

CompletableFuture.supplyAsync(() -> {
    // Code which might throw an exception
    return "Some result";
}).thenApply(result -> {
    return "processed result";
}).thenApply(result -> {
    return "result after further processing";
}).thenAccept(result -> {
    // do something with the final result
});

如果在原始的supplyAsync()任務中發生一個錯誤,這時候沒有任何thenApply會被呼叫並且future將以一個異常結束。如果在第一個thenApply發生錯誤,這時候第二個和第三個將不會被呼叫,同樣的,future將以異常結束。

7.1、使用 exceptionally() 回撥處理異常

exceptionally()回撥給你一個從原始Future中生成的錯誤恢復的機會。你可以在這裡記錄這個異常並返回一個預設值。

API

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);

看下程式碼

CompletableFuture.supplyAsync(() -> "resultA")
    .thenApply(resultA -> resultA + " resultB")
    .thenApply(resultB -> resultB + " resultC")
    .thenApply(resultC -> resultC + " resultD");

上面的程式碼中,任務 A、B、C、D 依次執行,如果任務 A 丟擲異常(當然上面的程式碼不會丟擲異常),那麼後面的任務都得不到執行。如果任務 C 丟擲異常,那麼任務 D 得不到執行。

那麼我們怎麼處理異常呢?看下面的程式碼,我們在任務 A 中丟擲異常,並對其進行處理:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException();
})
        .exceptionally(ex -> "ERROR - ResultA")
        .thenApply(resultA -> resultA + ", resultB")
        .thenApply(resultB -> resultB + ", resultC")
        .thenApply(resultC -> resultC + ", resultD");
System.out.println(future.join());
// Prints - ERROR - ResultA, resultB, resultC, resultD

上面的程式碼中,任務 A 丟擲異常,然後通過.exceptionally() 方法處理了異常,並返回新的結果,這個新的結果將傳遞給任務 B。

實戰情景

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).exceptionally(ex -> {
    LOGGER.error("Oops! We have an exception - {}", ex.getMessage());
    return "Unknown!";
});

System.out.println("Maturity : " + maturityFuture.get()); 
// Prints - Maturity : Unknown!

7.2、使用 handle() 方法處理異常

API提供了一個更通用的方法 - handle()從異常恢復,無論一個異常是否發生它都會被呼叫。

API

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

如果在鏈式呼叫的時候丟擲異常,則可以在最後使用handle來接收。

public void handleError() throws ExecutionException, InterruptedException {
    String name = null;
    
    CompletableFuture<String> completableFuture
            =  CompletableFuture.supplyAsync(() -> {
        if (name == null) {
            throw new RuntimeException("Computation error!");
        }
        return "Hello, " + name;
    }).handle((res, ex) -> res != null ? res : "Hello, Stranger!");

    System.out.println(completableFuture.get());
    // Prints - Hello, Stranger!
}

實戰情景

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).handle((res, ex) -> {
    if(ex != null) {
        LOGGER.error("Oops! We have an exception - {}", ex.getMessage());
        return "Unknown!";
    }
    return res;
});

System.out.println("Maturity : " + maturityFuture.get());
// Prints - Maturity : Unknown!

如果異常發生,res引數將是 null,否則,ex將是 null。

7.3、使用whenComplete()方法處理異常

API

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);

返回傳進去的值

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // long running task
    throw new RuntimeException("error!");
});

future.whenComplete((result, exception) -> {
    if (null == exception) {
        System.out.println("result from previous task: " + result);
    } else {
        System.err.println("Exception thrown from previous task: " + exception.getMessage());
    }
});

exceptionally()的使用非常類似於try{}catch{}中的catch{},但是由於支援鏈式程式設計方式,所以相對更簡單。

whenComplete()handle()系列方法就類似於 try{}finally{}中的finally{},無論是否發生異常都會執行 whenComplete()中的回撥函式 BiConsumerhandle()中的回撥函式 BiFunction。顧名思義,BiConsumer是直接消費的,而BiFunction是有返回值的,

whenComplete()handle() 的區別在於whenComplete()不支援返回結果,而handle()是支援返回結果的。

8、Async字尾方法

CompletableFuture類中的API的大多數方法都有兩個帶有Async字尾的附加修飾。這些方法表示用於非同步執行緒。

沒有Async字尾的方法使用呼叫執行緒執行下一個執行執行緒階段。不帶Async方法使用ForkJoinPool.commonPool()執行緒池的fork / join實現運算任務。帶有Async方法使用傳遞式的Executor任務去執行。

9、JDK 9 CompletableFuture API

Java 9中, CompletableFuture API通過以下更改得到了進一步增強:

  • 新工廠方法增加了
  • 支援延遲和超時
  • 改進了對子類化的支援。

引入了新的例項API

  • Executor defaultExecutor()
  • CompletableFuture<U> newIncompleteFuture()
  • CompletableFuture<T> copy()
  • CompletionStage<T> minimalCompletionStage()
  • CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor)
  • CompletableFuture<T> completeAsync(Supplier<? extends T> supplier)
  • CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
  • CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)

還有一些靜態實用方法:

  • Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
  • Executor delayedExecutor(long delay, TimeUnit unit)
  • <U> CompletionStage<U> completedStage(U value)
  • <U> CompletionStage<U> failedStage(Throwable ex)
  • <U> CompletableFuture<U> failedFuture(Throwable ex)

最後,為了解決超時問題,Java 9又引入了兩個新功能:

  • orTimeout()
  • completeOnTimeout()

四、Springboot 的非同步呼叫 @Async註解

@Async預設非同步配置使用的是SimpleAsyncTaskExecutor,該執行緒池預設來一個任務建立一個執行緒,在大量的請求的時候,這時就會不斷建立大量執行緒,極有可能壓爆伺服器記憶體
@Async的時候一定要設定執行緒數,以防萬一OOM

​ 非同步呼叫,類似我們多年前的ajax呼叫,區域性重新整理,整體不變,當然,在java的後臺的非同步呼叫,類似於自己實現一個多執行緒的程式,任務開啟一個執行緒後由它最去執行,我們其實是不能干預太多的。。

​ 在實際的開發中,如果某一個方法需要非同步去執行,那麼我們可以在它前面加上註解。@Async

1、 @Async介紹

​ 在Spring中,基於@Async標註的方法,稱之為非同步方法;這些方法將在執行的時候,將會在獨立的執行緒中被執行,呼叫者無需等待它的完成,即可繼續其他的操作。分為不帶引數的非同步呼叫;帶引數的非同步呼叫;呼叫返回Future的非同步執行緒

2、@Async呼叫中的事務處理機制

@Async標註的方法,同時也適用了@Transactional進行了標註;

​ 在其呼叫資料庫操作之時,將無法產生事務管理的控制,原因就在於其是基於非同步處理的操作。 **那該如何給這些操作新增事務管理呢?**可以將需要事務管理操作的方法放置到非同步方法內部,在內部被呼叫的方法上新增@Transactional.

例如:

方法A,使用了@Async/@Transactional來標註,但是無法產生事務控制的目的。

方法B,使用了@Async來標註, B中呼叫了C、D,C/D分別使用@Transactional做了標註,則可實現事務控制的目的。

3、配合使用@EnableAsync

啟動類或者Controller類加上@EnableAsync註解

@SpringBootApplication
@EnableAsync
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}
@EnableAsync
@RestController
public class HelloController {
    
    @Autowired
    TestAsyncService testAsyncService;

}

@EnableAsync註解的意思是可以非同步執行,就是開啟多執行緒的意思。可以標註在方法、類上。@Async所修飾的函式不要定義為static型別,這樣非同步呼叫不會生效

4、舉例:

比如需要呼叫一個傳送簡訊的任務,實際簡訊是渠道方去發的,那麼我們在把請求提交過去基本就結束了,這個時候就可以做一個非同步的呼叫來實現。

先使用@EnableAsync來開啟非同步的支援,配置一個執行緒池:

Spring 4 中,對非同步方法可以做一些配置,將配置類實現AsyncConfigurer 介面後,可以實現自定義執行緒池的功能,和統一處理非同步方法的異常。

如果不限制併發數,可能會造成系統壓力。

AsyncConfigurer 介面中的方法 Executor getAsyncExecutor() 實現自定義執行緒池。控制併發數。

AsyncConfigurer 介面中的方法 public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler()用於處理非同步方法的異常。

AsyncUncaughtExceptionHandler 介面,只有一個方法:void handleUncaughtException(Throwable ex, Method method, Object… params);

因此,AsyncUncaughtExceptionHandler 介面可以認為是一個函式式介面,可以用拉姆達表示式實現該介面。當然不處理也是可以的,沒用直接返回null

@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer {
     
    /**
     * ThreadFactory 為執行緒池建立的執行緒命名
     */
    private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("example-pool-%d").build();
    
    /**
     * 獲取非同步執行緒池執行物件
     */
    @Override
    public Executor getAsyncExecutor() {
        // 使用Spring內建執行緒池任務物件
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 設定執行緒池引數
        executor.initialize();
        // 獲取到伺服器的cpu核心
        int i = Runtime.getRuntime().availableProcessors();
        // 核心池大小
        executor.setCorePoolSize(5);
        // 最大執行緒數
        executor.setMaxPoolSize(100);
        // 佇列容量
        executor.setQueueCapacity(1000);
        // 執行緒空閒時間(秒)
        executor.setKeepAliveSeconds(1000);
        // 執行緒字首名稱
        executor.setThreadNamePrefix("task-async-");
        // 設定拒絕策略:當pool已經達到max size的時候,如何處理新任務 CALLER_RUNS:不在新執行緒中執行任務,而是有呼叫者所在的執行緒來執行
		executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
		// 等待所有任務結束後再關閉執行緒池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
//        return (throwable, method, objects) -> System.out.println(
//                "-- exception handler -- " + throwable + "-- method -- " + method + "-- objects -- " + objects);
    }
    
    
    /**
     * 自定義執行緒池
     */
    @Bean(name = "asyncTaskExecutor")
    public Executor asyncTaskExecutor() {
        //獲取CPU 核心數
        int nThreads = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                nThreads,
                2 * nThreads + 5,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                threadFactory,
                new ThreadPoolExecutor.CallerRunsPolicy()); 
        // 先行建立符合corePoolSize引數值的執行緒數
        threadPoolExecutor.prestartAllCoreThreads();
        return threadPoolExecutor;
    }
}

如果我們專案中需要多個執行緒池,則可以參考asyncTaskExecutor()方法例項出自定義的執行緒池asyncTaskExecutor,在使用的時候使用@Async("asyncTaskExecutor")來實現使用asyncTaskExecutor的執行緒池實現非同步操作。

@RequestMapping("")
public String doTask() throws InterruptedException {
    long currentTimeMillis = System.currentTimeMillis();
    System.out.println(Thread.currentThread().getName()+"主執行緒請求非同步執行task1()");
    this.task1();
    System.out.println(Thread.currentThread().getName()+"主執行緒請求非同步執行task1()結束");
    System.out.println(Thread.currentThread().getName()+"主執行緒請求非同步執行task2()");
    this.task2();
    System.out.println(Thread.currentThread().getName()+"主執行緒請求非同步執行task2()結束");
    this.task3();
    long currentTimeMillis1 = System.currentTimeMillis();
    return "task任務總耗時:" + (currentTimeMillis1 - currentTimeMillis) + "ms";
}

然後在指定需要非同步執行方法上加入@Async註解,並自定執行緒池(當然可以不指定,直接寫@Async

@Async("asyncTaskExecutor")
public void task1() throws InterruptedException{  
    long currentTimeMillis = System.currentTimeMillis();  
    System.out.println("task1," + Thread.currentThread().getName() + "," + new Date());
    Thread.sleep(1000);  
    long currentTimeMillis1 = System.currentTimeMillis();  
    System.out.println("task1任務非同步執行耗時:"+(currentTimeMillis1-currentTimeMillis)+"ms");  
}  

@Async
public void task2() throws InterruptedException{  
    long currentTimeMillis = System.currentTimeMillis();  
    System.out.println("task2," + Thread.currentThread().getName() + "," + new Date());
    Thread.sleep(2000);  
    long currentTimeMillis1 = System.currentTimeMillis();  
    System.out.println("task2任務非同步執行耗時:"+(currentTimeMillis1-currentTimeMillis)+"ms");  
}  

直接寫@Async不指定執行緒池時,如果執行緒池配置只配了上面asyncTaskExecutor一種,則會預設使用該執行緒池執行,結果和上面一樣,如果執行緒池配置配置了多個執行緒池,則此時不指定執行緒池時則會使用系統預設的SimpleAsyncTaskExecutor執行緒執行

將其中一個非同步方法,寫一行會產生異常的程式碼:

@Async
public void task3() {
    System.out.println("task3," + Thread.currentThread().getName() + "," + new Date());
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    int i=100/0;//丟擲異常

    System.out.println("task3," + Thread.currentThread().getName() + "," + new Date());
}

執行後的結果如下:

task3,example-pool-2,Sat Sep 15 21:52:48 CST 2018
-- exception handler -- java.lang.ArithmeticException: / by zero-- method -- public void com.learn.AsyncDemo.task3()-- objects -- [Ljava.lang.Object;@1626ab0b
task3,example-pool-2,Sat Sep 15 21:52:50 CST 2018

兩個非同步方法,使用的是同一個執行緒執行的。異常的處理也由AsyncUncaughtExceptionHandler介面處理掉了。

5、@Async失效

在同一個類中,一個方法呼叫另外一個有註解(比如@Async@Transational)的方法,註解是不會生效的。

比如,下面程式碼例子中,有兩方法,一個有@Async註解,一個沒有。第一次如果呼叫了有註解的test()方法,會啟動@Async註解作用;第一次如果呼叫testAsync(),因為它內部呼叫了有註解的test(),如果你以為系統也會為它啟動Async作用,那就錯了,實際上是沒有的。

@Async 基於AOP的,呼叫同類中的方法不走代理。

@Service
public class TestAsyncService {

    public void testAsync() throws Exception {
        //這裡的呼叫相當於:this.test();   這個this是service本身 而不是spring的service代理物件.所以aop不生效.
        test();
    }

    @Async
    public void test() throws InterruptedException{
        Thread.sleep(10000);//讓執行緒休眠,根據輸出結果判斷主執行緒和從執行緒是同步還是非同步
        System.out.println("非同步threadId:"+Thread.currentThread().getId());
    }
}

執行結果:testAsync()主執行緒 和 從執行緒 test() 同步執行。

原因:spring在掃描bean的時候會掃描方法上是否包含@Async註解,如果包含,spring會為這個bean動態地生成一個子類(即代理類,proxy),代理類是繼承原來那個bean的。此時,當這個有註解的方法被呼叫的時候,實際上是由代理類來呼叫的,代理類在呼叫時增加非同步作用。然而,如果這個有註解的方法是被同一個類中的其他方法呼叫的,那麼該方法的呼叫並沒有通過代理類,而是直接通過原來的那個bean,所以就沒有增加非同步作用,我們看到的現象就是@Async註解無效。