1. 程式人生 > >【併發程式設計】Future模式新增Callback及Promise 模式

【併發程式設計】Future模式新增Callback及Promise 模式

Future

Future是Java5增加的類,它用來描述一個非同步計算的結果。你可以使用 isDone 方法檢查計算是否完成,或者使用 get 方法阻塞住呼叫執行緒,直到計算完成返回結果。你也可以使用 cancel 方法停止任務的執行。下面來一個栗子:

public class FutureDemo {

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(10);
        Future<Integer> f = es.submit(() ->{
            Thread.sleep(10000);
            // 結果
            return 100;
        });

        // do something

        Integer result = f.get();
        System.out.println(result);

//        while (f.isDone()) {
//            System.out.println(result);
//        }
    }
}

在這個例子中,我們往執行緒池中提交了一個任務並立即返回了一個Future物件,接著可以做一些其他操作,最後利用它的 get 方法阻塞等待結果或 isDone 方法輪詢等待結果(關於Future的原理可以參考之前的文章:【併發程式設計】Future模式及JDK中的實現)

雖然這些方法提供了非同步執行任務的能力,但是對於結果的獲取卻還是很不方便,只能通過阻塞或者輪詢的方式得到任務的結果。

阻塞的方式顯然和我們的非同步程式設計的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,而且也不能及時的得到計算結果,為什麼不能用觀察者設計模式當計算結果完成及時通知監聽者呢?

很多語言,比如Node.js,採用Callback的方式實現非同步程式設計。Java的一些框架,比如Netty,自己擴充套件了Java的 Future 介面,提供了 addListener 等多個擴充套件方法。Google的guava也提供了通用的擴充套件Future:ListenableFuture 、 SettableFuture 以及輔助類 Futures 等,方便非同步程式設計。為此,Java終於在JDK1.8這個版本中增加了一個能力更強的Future類:CompletableFuture 。它提供了非常強大的Future的擴充套件功能,可以幫助我們簡化非同步程式設計的複雜性,提供了函數語言程式設計的能力,可以通過回撥的方式處理計算結果。下面來看看這幾種方式。

Netty-Future

引入Maven依賴:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.29.Final</version>
</dependency>
public class NettyFutureDemo {

    public static void main(String[] args) throws InterruptedException {
        EventExecutorGroup group = new DefaultEventExecutorGroup(4);
        System.out.println("開始:" + DateUtils.getNow());

        Future<Integer> f = group.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("開始耗時計算:" + DateUtils.getNow());
                Thread.sleep(10000);
                System.out.println("結束耗時計算:" + DateUtils.getNow());
                return 100;
            }
        });

        f.addListener(new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> objectFuture) throws Exception {
                System.out.println("計算結果:" + objectFuture.get());
            }
        });

        System.out.println("結束:" + DateUtils.getNow());
        // 不讓守護執行緒退出
        new CountDownLatch(1).await();
    }
}

輸出結果:

開始:2019-05-16 08:25:40:779
結束:2019-05-16 08:25:40:788
開始耗時計算:2019-05-16 08:25:40:788
結束耗時計算:2019-05-16 08:25:50:789
計算結果:100

從結果可以看出,耗時計算結束後自動觸發Listener的完成方法,避免了主執行緒無謂的阻塞等待,那麼它究竟是怎麼做到的呢?下面看原始碼

DefaultEventExecutorGroup 實現了 EventExecutorGroup 介面,而 EventExecutorGroup 則是實現了JDK ScheduledExecutorService 介面的執行緒組介面,所以它擁有執行緒池的所有方法。然而它卻把所有返回 java.util.concurrent.Future 的方法重寫為返回 io.netty.util.concurrent.Future ,把所有返回 java.util.concurrent.ScheduledFuture 的方法重寫為返回 io.netty.util.concurrent.ScheduledFuture 。

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
    /**
     * 返回一個EventExecutor
     */
    EventExecutor next();

    Iterator<EventExecutor> iterator();

    Future<?> submit(Runnable task);
    <T> Future<T> submit(Runnable task, T result);
    <T> Future<T> submit(Callable<T> task);

    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
    ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

EventExecutorGroup 的submit方法因為 newTaskFor 的重寫導致返回了netty的 Future 實現類,而這個實現類正是 PromiseTask 。

@Override
public <T> Future<T> submit(Callable<T> task) {
    return (Future<T>) super.submit(task);
}

@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new PromiseTask<T>(this, callable);
}

PromiseTask 的實現很簡單,它快取了要執行的 Callable 任務,並在run方法中完成了任務呼叫和Listener的通知。

@Override
public void run() {
    try {
        if (setUncancellableInternal()) {
            V result = task.call();
            setSuccessInternal(result);
        }
    } catch (Throwable e) {
        setFailureInternal(e);
    }
}

@Override
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}

@Override
public Promise<V> setFailure(Throwable cause) {
    if (setFailure0(cause)) {
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this, cause);
}

任務呼叫成功或者失敗都會呼叫 notifyListeners 來通知Listener,所以大家得在回撥的函式裡呼叫 isSuccess 方法來檢查狀態。

這裡有一個疑惑,會不會 Future 在呼叫 addListener 方法的時候任務已經執行完成了,這樣子會不會通知就會失敗了啊?

@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    synchronized (this) {
        addListener0(listener);
    }

    if (isDone()) {
        notifyListeners();
    }

    return this;
}

可以發現,在Listener新增成功之後,會立即檢查狀態,如果任務已經完成立刻進行回撥,所以這裡不用擔心啦。OK,下面看看Guava-Future的實現。

Guava-Future

首先引入guava的Maven依賴:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>22.0</version>
</dependency>
public class GuavaFutureDemo {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("開始:" + DateUtils.getNow());
        
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);
        ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("開始耗時計算:" + DateUtils.getNow());
                Thread.sleep(10000);
                System.out.println("結束耗時計算:" + DateUtils.getNow());
                return 100;
            }
        });
        
        future.addListener(new Runnable() {
            @Override
            public void run() {
                System.out.println("呼叫成功");
            }
        }, executorService);
        System.out.println("結束:" + DateUtils.getNow());
        new CountDownLatch(1).await();
    }
}

ListenableFuture 可以通過 addListener 方法增加回調函式,一般用於不在乎執行結果的地方。如果需要在執行成功時獲取結果或者執行失敗時獲取異常資訊,需要用到 Futures 工具類的 addCallback 方法:

Futures.addCallback(future, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(@Nullable Integer result) {
        System.out.println("成功,計算結果:" + result);
    }

    @Override
    public void onFailure(Throwable t) {
        System.out.println("失敗");
    }
}, executorService);

前面提到除了 ListenableFuture 外,還有一個 SettableFuture 類也支援回撥能力。它實現自 ListenableFuture ,所以擁有 ListenableFuture 的所有能力。

public class GuavaFutureDemo {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("開始:" + DateUtils.getNow());
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        ListenableFuture<Integer> future = submit(executorService);
        Futures.addCallback(future, new FutureCallback<Integer>() {
            @Override
            public void onSuccess(@Nullable Integer result) {
                System.out.println("成功,計算結果:" + result);
            }

            @Override
            public void onFailure(Throwable t) {
                System.out.println("失敗:" + t.getMessage());
            }
        }, executorService);
        Thread.sleep(1000);
        System.out.println("結束:" + DateUtils.getNow());
        new CountDownLatch(1).await();
    }

    private static ListenableFuture<Integer> submit(Executor executor) {
        SettableFuture<Integer> future = SettableFuture.create();
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("開始耗時計算:" + DateUtils.getNow());
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("結束耗時計算:" + DateUtils.getNow());
                // 返回值
                future.set(100);
                // 設定異常資訊
//                future.setException(new RuntimeException("custom error!"));
            }
        });
        return future;
    }
}

看起來用法上沒有太多差別,但是有一個很容易被忽略的重要問題。當 SettableFuture 的這種方式最後呼叫了 cancel 方法後,執行緒池中的任務還是會繼續執行,而通過 submit 方法返回的 ListenableFuture 方法則會立即取消執行,這點尤其要注意。下面看看原始碼:

和Netty的Future一樣,Guava也是通過實現了自定義的 ExecutorService 實現類 ListeningExecutorService 來重寫了 submit 方法。

public interface ListeningExecutorService extends ExecutorService {
  <T> ListenableFuture<T> submit(Callable<T> task);
  ListenableFuture<?> submit(Runnable task);
  <T> ListenableFuture<T> submit(Runnable task, T result);
}

同樣的,newTaskFor 方法也被進行了重寫,返回了自定義的Future類:TrustedListenableFutureTask

@Override
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return TrustedListenableFutureTask.create(runnable, value);
}

@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return TrustedListenableFutureTask.create(callable);
}

任務呼叫會走 TrustedFutureInterruptibleTask 的run方法:

@Override
public void run() {
    TrustedFutureInterruptibleTask localTask = task;
    if (localTask != null) {
        localTask.run();
    }
}

@Override
public final void run() {
    if (!ATOMIC_HELPER.compareAndSetRunner(this, null, Thread.currentThread())) {
        return; // someone else has run or is running.
    }
    try {
        // 抽象方法,子類進行重寫
        runInterruptibly();
    } finally {
        if (wasInterrupted()) {
            while (!doneInterrupting) {
                Thread.yield();
            }
        }
    }
}

最終還是呼叫到 TrustedFutureInterruptibleTask 的 runInterruptibly 方法,等待任務完成後呼叫 set 方法。

@Override
void runInterruptibly() {
    if (!isDone()) {
        try {
            set(callable.call());
        } catch (Throwable t) {
            setException(t);
        }
    }
}

protected boolean set(@Nullable V value) {
    Object valueToSet = value == null ? NULL : value;
    // CAS設定值
    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
        complete(this);
        return true;
    }
    return false;
}

在 complete 方法的最後會獲取到Listener進行回撥。

上面提到的 SettableFuture 和 ListenableFuture 的 cancel 方法效果不同,原因在於一個重寫了 afterDone 方法而一個沒有。

下面是 ListenableFuture 的 afterDone 方法:

@Override
protected void afterDone() {
    super.afterDone();

    if (wasInterrupted()) {
        TrustedFutureInterruptibleTask localTask = task;
        if (localTask != null) {
            localTask.interruptTask();
        }
    }

    this.task = null;
}

wasInterrupted 用來判斷是否呼叫了 cancel (cancel方法會設定一個取消物件Cancellation到value中)

protected final boolean wasInterrupted() {
    final Object localValue = value;
    return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted;
}

interruptTask 方法通過執行緒的 interrupt 方法真正取消執行緒任務的執行:

final void interruptTask() {
    Thread currentRunner = runner;
    if (currentRunner != null) {
        currentRunner.interrupt();
    }
    doneInterrupting = true;
}

 

由 Callback Hell 引出 Promise 模式

如果你對 ES6 有所接觸,就不會對 Promise 這個模式感到陌生,如果你對前端不熟悉,也不要緊,我們先來看看回調地獄(Callback Hell)是個什麼概念。

回撥是一種我們推崇的非同步呼叫方式,但也會遇到問題,也就是回撥的巢狀。當需要多個非同步回撥一起書寫時,就會出現下面的程式碼(以 js 為例):

asyncFunc1(opt, (...args1) => { 
  asyncFunc2(opt, (...args2) => {       
    asyncFunc3(opt, (...args3) => {            
      asyncFunc4(opt, (...args4) => {
          // some operation
      });
    });
  });
});

雖然在 JAVA 業務程式碼中很少出現回撥的多層巢狀,但總歸是個問題,這樣的程式碼不易讀,巢狀太深修改也麻煩。於是 ES6 提出了 Promise 模式來解決回撥地獄的問題。可能就會有人想問:java 中存在 Promise 模式嗎?答案是肯定的。

前面提到了 Netty 和 Guava 的擴充套件都提供了 addListener 這樣的介面,用於處理 Callback 呼叫,但其實 jdk1.8 已經提供了一種更為高階的回撥方式:CompletableFuture。首先嚐試用 CompletableFuture 來重寫上面回撥的問題。

public class CompletableFutureTest {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("開始:" + DateUtils.getNow());
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("開始耗時計算:" + DateUtils.getNow());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("結束耗時計算:" + DateUtils.getNow());
            return 100;
        });
        completableFuture.whenComplete((result, e) -> {
            System.out.println("回撥結果:" + result);
        });
        System.out.println("結束:" + DateUtils.getNow());
        new CountDownLatch(1).await();
    }
}

使用CompletableFuture耗時操作沒有佔用主執行緒的時間片,達到了非同步呼叫的效果。我們也不需要引入任何第三方的依賴,這都是依賴於 java.util.concurrent.CompletableFuture 的出現。CompletableFuture 提供了近 50 多個方法,大大便捷了 java 多執行緒操作,和非同步呼叫的寫法。

使用 CompletableFuture 解決回撥地獄問題:

public class CompletableFutureDemo {
    public static void main(String[] args) throws InterruptedException {
        long l = System.currentTimeMillis();
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("在回撥中執行耗時操作...");
            Thread.sleep(10000);
            return 100;
        });
        completableFuture = completableFuture.thenCompose(i -> {
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("在回撥的回撥中執行耗時操作...");
                Thread.sleep(10000);
                return i + 100;
            });
        });
        completableFuture.whenComplete((result, e) -> {
            System.out.println("計算結果:" + result);
        });
        System.out.println("主執行緒運算耗時:" + (System.currentTimeMillis() - l) + " ms");
        new CountDownLatch(1).await();
    }
}

輸出:

在回撥中執行耗時操作...主執行緒運算耗時:58 ms在回撥的回撥中執行耗時操作...計算結果:200

使用 thenCompose 或者 thenComposeAsync 等方法可以實現回撥的回撥,且寫出來的方法易於維護。

 

總的看來,為Future模式增加回調功能就不需要阻塞等待結果的返回並且不需要消耗無謂的CPU資源去輪詢處理狀態,JDK8之前使用Netty或者Guava提供的工具類,JDK8之後則可以使用自帶的 CompletableFuture 類。Future 有兩種模式:將來式和回撥式。而回調式會出現回撥地獄的問題,由此衍生出了 Promise 模式來解決這個問題。這才是 Future 模式和 Promise 模式的相關性。

&n