1. 程式人生 > 其它 >執行緒、執行緒池以及CompletableFuture組合式非同步程式設計

執行緒、執行緒池以及CompletableFuture組合式非同步程式設計

技術標籤:佇列java多執行緒python併發程式設計

一、建立執行緒的三種常見方式

1、繼承Thread

  1. 建立執行緒類,繼承Thread
  2. new Thread().start()的方式啟動執行緒
public static void main(String[] args) {
    System.out.println("執行緒" + Thread.currentThread().getName() + "開始執行");
    new Thread(new Thread01(), "thread01").start();
    System.out.println("執行緒" + Thread.currentThread().getName() + "執行完畢");
}
static class Thread01 extends Thread {
    @Override
    public void run() {
        System.out.println("執行緒" + Thread.currentThread().getName() + "開始執行");
        int i = 10 / 5;
        System.out.println("計算結果為" + i);
    }
}

2、實現Runnable介面

  1. 建立執行緒類,實現Runnable介面
  2. new Thread(執行緒類).start()的方式啟動執行緒
public static void main(String[] args) {

    System.out.println("執行緒" + Thread.currentThread().getName() + "開始執行");
    new Thread(new Thread02(), "thread02").start();
    System.out.println("執行緒" + Thread.currentThread().getName() + "執行完畢");
}
static class Thread02 implements Runnable {
    @Override
    public void run() {
        System.out.println("執行緒" + Thread.currentThread().getName() + "開始執行");
        int i = 10 / 5;
        System.out.println("計算結果為" + i);
    }
}

3、實現Callable<T>介面

  1. 建立執行緒類,實現Callable介面,可以有返回值
  2. 建立FutureTask<T> futureTask = new FutureTask<>(執行緒類);
  3. new Thread(futureTask).start()的方式啟動執行緒
  4. futureTask.get()獲取返回值
public static void main(String[] args) throws Exception {
    System.out.println("執行緒" + Thread.currentThread().getName() + "開始執行");
    FutureTask<String> stringFutureTask = new FutureTask<>(new Thread03());
    new Thread(stringFutureTask).start();
    System.out.println(stringFutureTask.get());
    System.out.println("執行緒" + Thread.currentThread().getName() + "執行完畢");
}
static class Thread03 implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("執行緒" + Thread.currentThread().getName() + "開始執行");
        int i = 10 / 5;
        System.out.println("計算結果為" + 10);
        return "返回到主程式了" + i;
    }
}

二、使用執行緒池執行執行緒

1、Executors自帶的執行緒池

固定大小執行緒池newFixedThreadPool

// @param nThreads 執行緒數量,核心執行緒數和最大執行緒數均為該值
// @param threadFactory 建立執行緒的工廠
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

單執行緒池,按序執行newSingleThreadExecutor

// @param threadFactory 建立執行緒的工廠
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

可快取的執行緒池,newCachedThreadPool

// @nThreads 執行緒數量,核心執行緒數和最大執行緒數均為該值
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

2、執行緒池的實現原理

所有Executors框架提供的執行緒池底層均為java.util.concurrent.ThreadPoolExecutor

/**
 * 通過給定的引數建立一個執行緒池.
 *
 * @param corePoolSize 一直存活的執行緒數,即使空閒,除非設定了allowCoreThreadTimeOut。
 * @param maximumPoolSize 最大存活執行緒數
 * @param keepAliveTime 當前存活執行緒數大於核心執行緒數時,空閒執行緒等待新任務最大時間
 * @param unit 引數keepAliveTime的時間單位
 * @param workQueue 任務執行前儲存任務的佇列。只儲存由execute方法提交的Runnable任務。
 * @param threadFactory 新建執行緒的執行緒工廠
 * @param handler 拒絕策略
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

3、執行緒池的執行

  1. 建立執行緒池物件
  2. 使用執行緒池物件execute進行
static ExecutorService executorService = Executors.newFixedThreadPool(4);

public static void main(String[] args) {
    System.out.println("執行緒" + Thread.currentThread().getName() + "開始執行");
    executorService.execute(new Thread01());  	// 沒有返回值的非同步執行
    executorService.execute(new Thread02());
    executorService.submit(new Thread01());		// 有返回值的非同步執行
    executorService.submit(new Thread02());
    System.out.println("執行緒" + Thread.currentThread().getName() + "執行完畢");
}

三、CompletableFuture組合式非同步程式設計

1、建立非同步物件

(1)runAsync supplyAsync方法

CompletableFuture 提供了四個靜態方法來建立一個非同步操作。

  • runAsync方法不支援返回值。
  • supplyAsync可以支援返回值。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

(2) 計算結果完成時的回撥方法

CompletableFuture的計算結果完成,或者丟擲異常的時候,可以執行特定的Action。主要是下面的方法:

//可以處理異常,無返回值
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

//可以處理異常,有返回值
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

可以看到Action的型別是BiConsumer<? super T,? super Throwable>它可以處理正常的計算結果,或者異常情況。

(3) handle 方法,任務完成後執行,可處理異常

handle 是執行任務完成時對結果的處理。 handle 方法和 thenApply 方法處理方式基本一樣。不同的是 handle 是在任務完成後再執行,還可以處理異常的任務。thenApply 只可以執行正常的任務,任務出現異常則不執行 thenApply 方法。

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

(4) 執行緒序列化

等待之前任務完成後執行

  • thenApply:能接受上一步結果,有返回值
public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor);
  • thenAccept:能接受上一步結果,但是無返回值
public <U> CompletionStage<U> thenAccept(Consumer<? super T> action);
public <U> CompletionStage<U> thenAcceptAsync(Consumer<? super T> action);
public <U> CompletionStage<U> thenAcceptAsync(Consumer<? super T> action, Executor executor);
  • thenRun:不能獲取上一步的執行結果
public <U> CompletionStage<U> thenRun(Runnable action);
public <U> CompletionStage<U> thenRunAsync(Runnable action);
public <U> CompletionStage<U> thenRunAsync(Runnable action, Executor executor);

(5) 合併任務,都要完成

thenCombine:組合兩個future,獲取兩個的結果,返回當前任務的返回值

public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn, Executor executor);

thenAcceptBoth:組合兩個future,獲取結果,然後處理任務,沒有返回值

public <U> CompletableFuture<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action);

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action);

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action, Executor executor);

runAfterBoth:組合兩個future,不需要獲取結果,只需要兩個future執行完成後就執行該任務

public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                            Runnable action);

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action);

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action,
                                                 Executor executor);

(6) 合併任務,僅完成一個

applyToEither:組合兩個future完成其中一個執行,獲取它的結果,返回當前任務的返回值

public <U> CompletableFuture<U> applyToEither(
    CompletionStage<? extends T> other, Function<? super T, U> fn);

public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn);

public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn,
    Executor executor);

acceptEither:組合兩個future完成其中一個執行,獲取一個的結果,然後處理任務,沒有返回值

public CompletableFuture<Void> acceptEither(
    CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action,
    Executor executor);

runAfterEither:組合兩個future完成其中一個執行,不需要獲取結果,只需要一個future執行完成後就執行該任務

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                            Runnable action);

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                 Runnable action);

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                 Runnable action,
                                                 Executor executor);

(7) 多工組合

allOf:等待所有任務完成

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

anyOf:只要有一個完成

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);