1. 程式人生 > >Java併發程式設計-擴充套件可回撥的Future

Java併發程式設計-擴充套件可回撥的Future

前提

最近在看JUC執行緒池java.util.concurrent.ThreadPoolExecutor的原始碼實現,其中瞭解到java.util.concurrent.Future的實現原理。從目前java.util.concurrent.Future的實現來看,雖然實現了非同步提交任務,但是任務結果的獲取過程需要主動呼叫Future#get()或者Future#get(long timeout, TimeUnit unit),而前者是阻塞的,後者在非同步任務執行時間不確定的情況下有可能需要進行輪詢,這兩種情況和非同步呼叫的初衷有點相違背。於是筆者想結合目前瞭解到的Future實現原理的前提下擴展出支援(監聽)回撥的Future

,思路上參考了Guava增強的ListenableFuture。本文編寫的時候使用的JDK是JDK11,其他版本可能不適合。

簡單分析Future的實現原理

虛擬例子推演

併發大師Doug Lea在設計JUC執行緒池的時候,提供了一個頂層執行器介面Executor

public interface Executor {

    void execute(Runnable command);
}    

實際上,這裡定義的方法Executor#execute()是整套執行緒池體系最核心的介面,也就是ThreadPoolExecutor定義的核心執行緒、額外建立的執行緒(執行緒池最大執行緒容量 - 核心執行緒數)都是在這個介面提交任務的時候懶建立的,也就是說ExecutorService

介面擴充套件的功能都是基於Executor#execute()的基礎進行擴充套件。Executor#execute()方法只是單純地把任務例項Runnable物件投放到執行緒池中分配合適的執行緒執行,但是由於方法返回值是void型別,我們是無法感知任務什麼時候執行完畢。這個時候就需要對Runnable任務例項進行包裝(下面是虛擬碼 + 偽邏輯):

// 下面這個Wrapper和Status類是筆者虛構出來
@RequiredArgsConstructor
class Wrapper implements Runnable{

    private final Runnable target;
    private Status status = Status.of("初始化");

    @Override
    public void run(){
        try{
           target.run();
           status = Status.of("執行成功");
        }catch(Throwable t){
           status = Status.of("執行異常"); 
        }
    }
}

我們只需要把new Wrapper(原始Runnable例項)投放到執行緒池執行,那麼通過定義好的Status狀態記錄變數就能得知非同步任務執行的狀態,以及什麼時候執行完畢(包括正常的執行完畢和異常的執行完畢)。這裡僅僅解決了任務執行的狀態獲取,但是Executor#execute()方法法返回值是void型別的特點使得我們無法回撥Runnable物件執行的結果。這個時候需要定義一個可以回撥執行結果的介面,其實已經有現成的介面Callable

@FunctionalInterface
public interface Callable<V> {

    V call() throws Exception;
}    

這裡遇到了一個問題:由於Executor#execute()只接收Runnable引數,我們需要把Callable介面適配到Runnable介面,這個時候,做一次簡單的委託即可:

@RequiredArgsConstructor
class Wrapper implements Runnable{

    private final Callable callable;
    private Status status = Status.of("初始化");
    @Getter
    private Object outcome;

    @Override
    public void run(){
        try{
           outcome = callable.call();
           status = Status.of("執行成功");
        }catch(Throwable t){
           status = Status.of("執行異常"); 
           outcome = t;
        }
    }
}

這裡把Callable例項直接委託給Wrapper,而Wrapper實現了Runnable介面,執行結果直接存放在定義好的Object型別的物件outcome中即可。當我們感知到執行狀態已經結束,就可以從outcome中提取到執行結果。

Future的實現

上面一個小結僅僅對Future實現做一個相對合理的虛擬推演,實際上,RunnableFuture才是JUC中常用的複合介面,它同時實現了RunnableFuture

public interface RunnableFuture<V> extends Runnable, Future<V> {
    
    void run();
}

上一節提到的虛構出來的Wrapper類,在JUC中類似的實現是java.util.concurrent.FutureTask,它就是CallableRunnable的介面卡,FutureTask實現了RunnableFuture介面:

public class FutureTask<V> implements RunnableFuture<V> {

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
    
    // 省略其他程式碼
}    

注意到核心屬性state表示執行狀態,outcome承載執行結果。接著看提交Callable型別任務的方法ExecutorService#submit()

public interface ExecutorService extends Executor {

    // 省略其他介面方法

    <T> Future<T> submit(Callable<T> task);
}    

當我們通過上述ExecutorService#submit()方法提交Callable型別任務的時候,實際上做了如下的步驟:

  1. 檢查入參task的存在性,如果為null丟擲NullPointerException
  2. Callable型別的task包裝為FutureTask例項。
  3. 把新建的FutureTask例項放到執行緒池中執行,也就是呼叫Executor#execute(FutureTask例項)
  4. 返回FutureTask例項的介面例項RunnableFuture(實際上是返回子介面Future例項)。

如果我們需要獲取結果,可以Future#get()或者Future#get(long timeout, TimeUnit unit)獲取,呼叫這兩個方法的時候參看FutureTask裡面的方法實現,得知步驟如下:

  1. 如果狀態state小於等於COMPLETING(1),說明任務還在執行中,獲取結果的請求執行緒會放入WaitNode型別的佇列中進行阻塞。
  2. 如果任務執行完畢,不管異常完畢還是正常完畢,除了會更新狀態state和把結果賦值到outcome之外,還會喚醒所有阻塞獲取結果的執行緒,然後呼叫鉤子方法FutureTask#done()(具體見原始碼FutureTask#finishCompletion())。

其實分析了這麼多,筆者想指出的結論就是:Callable型別任務提交到執行緒池中執行完畢(包括正常執行完畢和異常執行完畢)之後,都會回撥鉤子方法FutureTask#done()。這個就是我們擴充套件可監聽Future的理論依據。

擴充套件可回撥的Future

先做一次編碼實現,再簡單測試其功能。

編碼實現

先定義一個Future介面的子介面ListenableFuture,用於新增可監聽的回撥:

public interface ListenableFuture<V> extends Future<V> {

    void addCallback(ListenableFutureCallback<V> callback, Executor executor);
}

ListenableFutureCallback是一個函式式回撥介面:

@FunctionalInterface
public interface ListenableFutureCallback<V> {

    void callback(V value, Throwable throwable);
}

對於ListenableFutureCallback而言,回撥的結果valuethrowable是互斥的。正常執行完畢的情況下value將會是執行結果值,throwablenull;異常執行完畢的情況下,value將會是nullthrowable將會是丟擲的異常例項。如果更習慣於分開處理正常執行完畢的結果和異常執行完畢的結果,ListenableFutureCallback可以這樣定義:

public interface ListenableFutureCallback<V> {

    void onSuccess(V value);

    void onError(Throwable throwable);
}

接著定義ListenableExecutorService介面繼承ExecutorService介面:

public interface ListenableExecutorService extends ExecutorService {

    <T> ListenableFuture<T> listenableSubmit(Callable<T> callable);

    /**
     * 定義這個方法是因為有些時候由於任務執行時間非常短,有可能通過返回的ListenableFuture例項添加回調之前已經執行完畢,因此可以支援顯式傳入回撥
     *
     * @param callable  callable
     * @param callbacks callbacks
     * @param executor  executor
     * @return ListenableFuture
     */
    <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor);
}

然後新增一個執行單元介面卡ListenableFutureCallbackRunnable,承載每次回撥觸發的呼叫(實現Runnable介面,從而支援非同步執行):

@RequiredArgsConstructor
public class ListenableFutureCallbackRunnable<V> implements Runnable {

    private final ListenableFutureCallback<V> callback;
    private final V value;
    private final Throwable throwable;

    @Override
    public void run() {
        callback.callback(value, throwable);
    }
}

接著需要定義一個FutureTask的子類ListenableFutureTask,核心邏輯是覆蓋FutureTask#done()方法觸發回撥:

// ListenableFutureTask
public class ListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {

    private final List<Execution<V>> executions = new ArrayList<>();

    public ListenableFutureTask(Callable<V> callable) {
        super(callable);
    }

    public ListenableFutureTask(Runnable runnable, V result) {
        super(runnable, result);
    }

    public static <V> ListenableFutureTask<V> newTaskFor(Callable<V> callable) {
        return new ListenableFutureTask<>(callable);
    }

    @Override
    protected void done() {
        Iterator<Execution<V>> iterator = executions.iterator();
        Throwable throwable = null;
        V value = null;
        try {
            value = get();
        } catch (Throwable t) {
            throwable = t;
        }
        while (iterator.hasNext()) {
            Execution<V> execution = iterator.next();
            ListenableFutureCallbackRunnable<V> callbackRunnable = new ListenableFutureCallbackRunnable<>(execution.getCallback(),
                    value, throwable);
            // 非同步回撥
            if (null != execution.getExecutor()) {
                execution.getExecutor().execute(callbackRunnable);
            } else {
                // 同步回撥
                callbackRunnable.run();
            }
        }
    }

    @Override
    public void addCallback(ListenableFutureCallback<V> callback, Executor executor) {
        Execution<V> execution = new Execution<>();
        execution.setCallback(callback);
        execution.setExecutor(executor);
        executions.add(execution);
    }
}

// Execution - 承載每個回撥例項和對應的Executor,Executor例項為null則進行同步回撥
@Data
public class Execution <V>{

    private Executor executor;
    private ListenableFutureCallback<V> callback;
}

最後一步就是編寫執行緒池ListenableThreadPoolExecutor,繼承自ThreadPoolExecutor並且實現ListenableExecutorService介面:

public class ListenableThreadPoolExecutor extends ThreadPoolExecutor implements ListenableExecutorService {

    public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
    BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
    BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
     BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable) {
        if (null == callable) {
            throw new IllegalArgumentException("callable");
        }
        ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable);
        execute(listenableFutureTask);
        return listenableFutureTask;
    }

    @Override
    public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor) {
        if (null == callable) {
            throw new IllegalArgumentException("callable");
        }
        if (null == callbacks) {
            throw new IllegalArgumentException("callbacks");
        }
        ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable);
        for (ListenableFutureCallback<T> callback : callbacks) {
            listenableFutureTask.addCallback(callback, executor);
        }
        execute(listenableFutureTask);
        return listenableFutureTask;
    }
}

測試

引入junit,編寫測試類如下:

public class ListenableFutureTest {

    private static ListenableExecutorService EXECUTOR;
    private static Executor E;

    @BeforeClass
    public static void before() {
        EXECUTOR = new ListenableThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10), new ThreadFactory() {

            private final AtomicInteger counter = new AtomicInteger();

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName(String.format("ListenableWorker-%d", counter.getAndIncrement()));
                return thread;
            }
        });
        E = Executors.newFixedThreadPool(3);
    }

    @Test
    public void testListenableFuture1() throws Exception {
        ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
            Thread.sleep(1000);
            return "message";
        });
        future.addCallback((v, t) -> {
            System.out.println(String.format("Value = %s,Throwable = %s", v, t));
        }, null);
        Thread.sleep(2000);
    }

    @Test
    public void testListenableFuture2() throws Exception {
        ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
            Thread.sleep(1000);
            throw new RuntimeException("exception");
        });
        future.addCallback((v, t) -> {
            System.out.println(String.format("Value = %s,Throwable = %s", v, t));
        }, null);
        Thread.sleep(2000);
    }

    @Test
    public void testListenableFuture3() throws Exception {
        ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
            Thread.sleep(1000);
            return "message";
        });
        future.addCallback((v, t) -> {
            System.out.println(String.format("Value = %s,Throwable = %s", v, t));
        }, E);
        System.out.println("testListenableFuture3 end...");
        Thread.sleep(2000);
    }

    @Test
    public void testListenableFuture4() throws Exception {
        ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
            Thread.sleep(1000);
            throw new RuntimeException("exception");
        });
        future.addCallback((v, t) -> {
            System.out.println(String.format("Value = %s,Throwable = %s", v, t));
        }, E);
        System.out.println("testListenableFuture4 end...");
        Thread.sleep(2000);
    }
}

執行結果:

// testListenableFuture1
Value = message,Throwable = null

// testListenableFuture2
Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception

// testListenableFuture3
testListenableFuture3 end...
Value = message,Throwable = null

// testListenableFuture4
testListenableFuture4 end...
Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception

和預期的結果一致,注意一下如果Callable執行丟擲異常,異常被包裝為ExecutionException,要呼叫Throwable#getCause()才能得到原始的異常例項。

小結

本文通過了解ThreadPoolExecutorFuture的實現原理做簡單的擴充套件,使得非同步提交任務變得更加優雅和簡便。強化了動手能力的同時,也能加深對併發程式設計的一些認知。當然,本文只是提供一個十分簡陋的實現,筆者其實還想到了如對回撥處理的耗時做監控、回撥打上分組標籤執行等等更完善的功能,等到有需要的場景再進行實現。

這裡記錄一下過程中的一些領悟:

  • Executor#execute()是執行緒池的核心介面,所有其他功能都是基於此介面做擴充套件,它的設計本身是無狀態的。
  • 靈活使用介面卡模式,可以在不改變已釋出的介面的功能同時實現新的介面的功能適配。
  • 要善於發掘和使用JDK類庫設計者留給開發者的擴充套件介面。

個人部落格

  • Throwable's Blog

(本文完 c-1-d e-a-20190702