執行緒、執行緒池以及CompletableFuture組合式非同步程式設計
阿新 • • 發佈:2021-01-02
一、建立執行緒的三種常見方式
1、繼承Thread
類
- 建立執行緒類,繼承Thread
new Thread().start()
的方式啟動執行緒
public static void main(String[] args) { System.out.println("執行緒" + Thread.currentThread().getName() + "開始執行"); new Thread(new Thread01(), "thread01").start(); System.out.println("執行緒" + Thread.currentThread().getName() + "執行完畢"); } static class Thread01 extends Thread { @Override public void run() { System.out.println("執行緒" + Thread.currentThread().getName() + "開始執行"); int i = 10 / 5; System.out.println("計算結果為" + i); } }
2、實現Runnable
介面
- 建立執行緒類,實現Runnable介面
new Thread(執行緒類).start()
的方式啟動執行緒
public static void main(String[] args) { System.out.println("執行緒" + Thread.currentThread().getName() + "開始執行"); new Thread(new Thread02(), "thread02").start(); System.out.println("執行緒" + Thread.currentThread().getName() + "執行完畢"); } static class Thread02 implements Runnable { @Override public void run() { System.out.println("執行緒" + Thread.currentThread().getName() + "開始執行"); int i = 10 / 5; System.out.println("計算結果為" + i); } }
3、實現Callable<T>
介面
- 建立執行緒類,實現Callable介面,可以有返回值
- 建立
FutureTask<T> futureTask = new FutureTask<>(執行緒類);
new Thread(futureTask).start()
的方式啟動執行緒futureTask.get()
獲取返回值
public static void main(String[] args) throws Exception { System.out.println("執行緒" + Thread.currentThread().getName() + "開始執行"); FutureTask<String> stringFutureTask = new FutureTask<>(new Thread03()); new Thread(stringFutureTask).start(); System.out.println(stringFutureTask.get()); System.out.println("執行緒" + Thread.currentThread().getName() + "執行完畢"); } static class Thread03 implements Callable<String> { @Override public String call() throws Exception { System.out.println("執行緒" + Thread.currentThread().getName() + "開始執行"); int i = 10 / 5; System.out.println("計算結果為" + 10); return "返回到主程式了" + i; } }
二、使用執行緒池執行執行緒
1、Executors自帶的執行緒池
固定大小執行緒池newFixedThreadPool
// @param nThreads 執行緒數量,核心執行緒數和最大執行緒數均為該值
// @param threadFactory 建立執行緒的工廠
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
單執行緒池,按序執行newSingleThreadExecutor
// @param threadFactory 建立執行緒的工廠
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
可快取的執行緒池,newCachedThreadPool
// @nThreads 執行緒數量,核心執行緒數和最大執行緒數均為該值
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
2、執行緒池的實現原理
所有Executors框架提供的執行緒池底層均為java.util.concurrent.ThreadPoolExecutor
/**
* 通過給定的引數建立一個執行緒池.
*
* @param corePoolSize 一直存活的執行緒數,即使空閒,除非設定了allowCoreThreadTimeOut。
* @param maximumPoolSize 最大存活執行緒數
* @param keepAliveTime 當前存活執行緒數大於核心執行緒數時,空閒執行緒等待新任務最大時間
* @param unit 引數keepAliveTime的時間單位
* @param workQueue 任務執行前儲存任務的佇列。只儲存由execute方法提交的Runnable任務。
* @param threadFactory 新建執行緒的執行緒工廠
* @param handler 拒絕策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
3、執行緒池的執行
- 建立執行緒池物件
- 使用執行緒池物件
execute
進行
static ExecutorService executorService = Executors.newFixedThreadPool(4);
public static void main(String[] args) {
System.out.println("執行緒" + Thread.currentThread().getName() + "開始執行");
executorService.execute(new Thread01()); // 沒有返回值的非同步執行
executorService.execute(new Thread02());
executorService.submit(new Thread01()); // 有返回值的非同步執行
executorService.submit(new Thread02());
System.out.println("執行緒" + Thread.currentThread().getName() + "執行完畢");
}
三、CompletableFuture
組合式非同步程式設計
1、建立非同步物件
(1)runAsync
和 supplyAsync
方法
CompletableFuture
提供了四個靜態方法來建立一個非同步操作。
runAsync
方法不支援返回值。supplyAsync
可以支援返回值。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
(2) 計算結果完成時的回撥方法
當CompletableFuture
的計算結果完成,或者丟擲異常的時候,可以執行特定的Action。主要是下面的方法:
//可以處理異常,無返回值
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
//可以處理異常,有返回值
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
可以看到Action的型別是BiConsumer<? super T,? super Throwable>
它可以處理正常的計算結果,或者異常情況。
(3) handle 方法,任務完成後執行,可處理異常
handle 是執行任務完成時對結果的處理。 handle 方法和 thenApply 方法處理方式基本一樣。不同的是 handle 是在任務完成後再執行,還可以處理異常的任務。thenApply 只可以執行正常的任務,任務出現異常則不執行 thenApply 方法。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
(4) 執行緒序列化
等待之前任務完成後執行
- thenApply:能接受上一步結果,有返回值
public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor);
- thenAccept:能接受上一步結果,但是無返回值
public <U> CompletionStage<U> thenAccept(Consumer<? super T> action);
public <U> CompletionStage<U> thenAcceptAsync(Consumer<? super T> action);
public <U> CompletionStage<U> thenAcceptAsync(Consumer<? super T> action, Executor executor);
- thenRun:不能獲取上一步的執行結果
public <U> CompletionStage<U> thenRun(Runnable action);
public <U> CompletionStage<U> thenRunAsync(Runnable action);
public <U> CompletionStage<U> thenRunAsync(Runnable action, Executor executor);
(5) 合併任務,都要完成
thenCombine:組合兩個future,獲取兩個的結果,返回當前任務的返回值
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor);
thenAcceptBoth:組合兩個future,獲取結果,然後處理任務,沒有返回值
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor);
runAfterBoth:組合兩個future,不需要獲取結果,只需要兩個future執行完成後就執行該任務
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor);
(6) 合併任務,僅完成一個
applyToEither:組合兩個future完成其中一個執行,獲取它的結果,返回當前任務的返回值
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor);
acceptEither:組合兩個future完成其中一個執行,獲取一個的結果,然後處理任務,沒有返回值
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor);
runAfterEither:組合兩個future完成其中一個執行,不需要獲取結果,只需要一個future執行完成後就執行該任務
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor);
(7) 多工組合
allOf:等待所有任務完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
anyOf:只要有一個完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);