Java併發程式設計-擴充套件可回撥的Future
前提
最近在看JUC執行緒池java.util.concurrent.ThreadPoolExecutor
的原始碼實現,其中瞭解到java.util.concurrent.Future
的實現原理。從目前java.util.concurrent.Future
的實現來看,雖然實現了非同步提交任務,但是任務結果的獲取過程需要主動呼叫Future#get()
或者Future#get(long timeout, TimeUnit unit)
,而前者是阻塞的,後者在非同步任務執行時間不確定的情況下有可能需要進行輪詢,這兩種情況和非同步呼叫的初衷有點相違背。於是筆者想結合目前瞭解到的Future
實現原理的前提下擴展出支援(監聽)回撥的Future
Guava
增強的ListenableFuture
。本文編寫的時候使用的JDK是JDK11,其他版本可能不適合。
簡單分析Future的實現原理
虛擬例子推演
併發大師Doug Lea在設計JUC執行緒池的時候,提供了一個頂層執行器介面Executor
:
public interface Executor {
void execute(Runnable command);
}
實際上,這裡定義的方法Executor#execute()
是整套執行緒池體系最核心的介面,也就是ThreadPoolExecutor
定義的核心執行緒、額外建立的執行緒(執行緒池最大執行緒容量 - 核心執行緒數)都是在這個介面提交任務的時候懶建立的,也就是說ExecutorService
Executor#execute()
的基礎進行擴充套件。Executor#execute()
方法只是單純地把任務例項Runnable
物件投放到執行緒池中分配合適的執行緒執行,但是由於方法返回值是void
型別,我們是無法感知任務什麼時候執行完畢。這個時候就需要對Runnable
任務例項進行包裝(下面是虛擬碼 + 偽邏輯):
// 下面這個Wrapper和Status類是筆者虛構出來 @RequiredArgsConstructor class Wrapper implements Runnable{ private final Runnable target; private Status status = Status.of("初始化"); @Override public void run(){ try{ target.run(); status = Status.of("執行成功"); }catch(Throwable t){ status = Status.of("執行異常"); } } }
我們只需要把new Wrapper(原始Runnable例項)
投放到執行緒池執行,那麼通過定義好的Status
狀態記錄變數就能得知非同步任務執行的狀態,以及什麼時候執行完畢(包括正常的執行完畢和異常的執行完畢)。這裡僅僅解決了任務執行的狀態獲取,但是Executor#execute()
方法法返回值是void
型別的特點使得我們無法回撥Runnable
物件執行的結果。這個時候需要定義一個可以回撥執行結果的介面,其實已經有現成的介面Callable
:
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
這裡遇到了一個問題:由於Executor#execute()
只接收Runnable
引數,我們需要把Callable
介面適配到Runnable
介面,這個時候,做一次簡單的委託即可:
@RequiredArgsConstructor
class Wrapper implements Runnable{
private final Callable callable;
private Status status = Status.of("初始化");
@Getter
private Object outcome;
@Override
public void run(){
try{
outcome = callable.call();
status = Status.of("執行成功");
}catch(Throwable t){
status = Status.of("執行異常");
outcome = t;
}
}
}
這裡把Callable
例項直接委託給Wrapper
,而Wrapper
實現了Runnable
介面,執行結果直接存放在定義好的Object
型別的物件outcome
中即可。當我們感知到執行狀態已經結束,就可以從outcome
中提取到執行結果。
Future的實現
上面一個小結僅僅對Future
實現做一個相對合理的虛擬推演,實際上,RunnableFuture
才是JUC中常用的複合介面,它同時實現了Runnable
和Future
:
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
上一節提到的虛構出來的Wrapper
類,在JUC中類似的實現是java.util.concurrent.FutureTask
,它就是Callable
和Runnable
的介面卡,FutureTask
實現了RunnableFuture
介面:
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
// 省略其他程式碼
}
注意到核心屬性state
表示執行狀態,outcome
承載執行結果。接著看提交Callable
型別任務的方法ExecutorService#submit()
:
public interface ExecutorService extends Executor {
// 省略其他介面方法
<T> Future<T> submit(Callable<T> task);
}
當我們通過上述ExecutorService#submit()
方法提交Callable
型別任務的時候,實際上做了如下的步驟:
- 檢查入參
task
的存在性,如果為null
丟擲NullPointerException
。 - 把
Callable
型別的task
包裝為FutureTask
例項。 - 把新建的
FutureTask
例項放到執行緒池中執行,也就是呼叫Executor#execute(FutureTask例項)
。 - 返回
FutureTask
例項的介面例項RunnableFuture
(實際上是返回子介面Future
例項)。
如果我們需要獲取結果,可以Future#get()
或者Future#get(long timeout, TimeUnit unit)
獲取,呼叫這兩個方法的時候參看FutureTask
裡面的方法實現,得知步驟如下:
- 如果狀態
state
小於等於COMPLETING(1)
,說明任務還在執行中,獲取結果的請求執行緒會放入WaitNode
型別的佇列中進行阻塞。 - 如果任務執行完畢,不管異常完畢還是正常完畢,除了會更新狀態
state
和把結果賦值到outcome
之外,還會喚醒所有阻塞獲取結果的執行緒,然後呼叫鉤子方法FutureTask#done()
(具體見原始碼FutureTask#finishCompletion()
)。
其實分析了這麼多,筆者想指出的結論就是:Callable
型別任務提交到執行緒池中執行完畢(包括正常執行完畢和異常執行完畢)之後,都會回撥鉤子方法FutureTask#done()
。這個就是我們擴充套件可監聽Future
的理論依據。
擴充套件可回撥的Future
先做一次編碼實現,再簡單測試其功能。
編碼實現
先定義一個Future
介面的子介面ListenableFuture
,用於新增可監聽的回撥:
public interface ListenableFuture<V> extends Future<V> {
void addCallback(ListenableFutureCallback<V> callback, Executor executor);
}
ListenableFutureCallback
是一個函式式回撥介面:
@FunctionalInterface
public interface ListenableFutureCallback<V> {
void callback(V value, Throwable throwable);
}
對於ListenableFutureCallback
而言,回撥的結果value
和throwable
是互斥的。正常執行完畢的情況下value
將會是執行結果值,throwable
為null
;異常執行完畢的情況下,value
將會是null
,throwable
將會是丟擲的異常例項。如果更習慣於分開處理正常執行完畢的結果和異常執行完畢的結果,ListenableFutureCallback
可以這樣定義:
public interface ListenableFutureCallback<V> {
void onSuccess(V value);
void onError(Throwable throwable);
}
接著定義ListenableExecutorService
介面繼承ExecutorService
介面:
public interface ListenableExecutorService extends ExecutorService {
<T> ListenableFuture<T> listenableSubmit(Callable<T> callable);
/**
* 定義這個方法是因為有些時候由於任務執行時間非常短,有可能通過返回的ListenableFuture例項添加回調之前已經執行完畢,因此可以支援顯式傳入回撥
*
* @param callable callable
* @param callbacks callbacks
* @param executor executor
* @return ListenableFuture
*/
<T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor);
}
然後新增一個執行單元介面卡ListenableFutureCallbackRunnable
,承載每次回撥觸發的呼叫(實現Runnable
介面,從而支援非同步執行):
@RequiredArgsConstructor
public class ListenableFutureCallbackRunnable<V> implements Runnable {
private final ListenableFutureCallback<V> callback;
private final V value;
private final Throwable throwable;
@Override
public void run() {
callback.callback(value, throwable);
}
}
接著需要定義一個FutureTask
的子類ListenableFutureTask
,核心邏輯是覆蓋FutureTask#done()
方法觸發回撥:
// ListenableFutureTask
public class ListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {
private final List<Execution<V>> executions = new ArrayList<>();
public ListenableFutureTask(Callable<V> callable) {
super(callable);
}
public ListenableFutureTask(Runnable runnable, V result) {
super(runnable, result);
}
public static <V> ListenableFutureTask<V> newTaskFor(Callable<V> callable) {
return new ListenableFutureTask<>(callable);
}
@Override
protected void done() {
Iterator<Execution<V>> iterator = executions.iterator();
Throwable throwable = null;
V value = null;
try {
value = get();
} catch (Throwable t) {
throwable = t;
}
while (iterator.hasNext()) {
Execution<V> execution = iterator.next();
ListenableFutureCallbackRunnable<V> callbackRunnable = new ListenableFutureCallbackRunnable<>(execution.getCallback(),
value, throwable);
// 非同步回撥
if (null != execution.getExecutor()) {
execution.getExecutor().execute(callbackRunnable);
} else {
// 同步回撥
callbackRunnable.run();
}
}
}
@Override
public void addCallback(ListenableFutureCallback<V> callback, Executor executor) {
Execution<V> execution = new Execution<>();
execution.setCallback(callback);
execution.setExecutor(executor);
executions.add(execution);
}
}
// Execution - 承載每個回撥例項和對應的Executor,Executor例項為null則進行同步回撥
@Data
public class Execution <V>{
private Executor executor;
private ListenableFutureCallback<V> callback;
}
最後一步就是編寫執行緒池ListenableThreadPoolExecutor
,繼承自ThreadPoolExecutor
並且實現ListenableExecutorService
介面:
public class ListenableThreadPoolExecutor extends ThreadPoolExecutor implements ListenableExecutorService {
public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable) {
if (null == callable) {
throw new IllegalArgumentException("callable");
}
ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable);
execute(listenableFutureTask);
return listenableFutureTask;
}
@Override
public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor) {
if (null == callable) {
throw new IllegalArgumentException("callable");
}
if (null == callbacks) {
throw new IllegalArgumentException("callbacks");
}
ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable);
for (ListenableFutureCallback<T> callback : callbacks) {
listenableFutureTask.addCallback(callback, executor);
}
execute(listenableFutureTask);
return listenableFutureTask;
}
}
測試
引入junit
,編寫測試類如下:
public class ListenableFutureTest {
private static ListenableExecutorService EXECUTOR;
private static Executor E;
@BeforeClass
public static void before() {
EXECUTOR = new ListenableThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10), new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName(String.format("ListenableWorker-%d", counter.getAndIncrement()));
return thread;
}
});
E = Executors.newFixedThreadPool(3);
}
@Test
public void testListenableFuture1() throws Exception {
ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
Thread.sleep(1000);
return "message";
});
future.addCallback((v, t) -> {
System.out.println(String.format("Value = %s,Throwable = %s", v, t));
}, null);
Thread.sleep(2000);
}
@Test
public void testListenableFuture2() throws Exception {
ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
Thread.sleep(1000);
throw new RuntimeException("exception");
});
future.addCallback((v, t) -> {
System.out.println(String.format("Value = %s,Throwable = %s", v, t));
}, null);
Thread.sleep(2000);
}
@Test
public void testListenableFuture3() throws Exception {
ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
Thread.sleep(1000);
return "message";
});
future.addCallback((v, t) -> {
System.out.println(String.format("Value = %s,Throwable = %s", v, t));
}, E);
System.out.println("testListenableFuture3 end...");
Thread.sleep(2000);
}
@Test
public void testListenableFuture4() throws Exception {
ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> {
Thread.sleep(1000);
throw new RuntimeException("exception");
});
future.addCallback((v, t) -> {
System.out.println(String.format("Value = %s,Throwable = %s", v, t));
}, E);
System.out.println("testListenableFuture4 end...");
Thread.sleep(2000);
}
}
執行結果:
// testListenableFuture1
Value = message,Throwable = null
// testListenableFuture2
Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception
// testListenableFuture3
testListenableFuture3 end...
Value = message,Throwable = null
// testListenableFuture4
testListenableFuture4 end...
Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception
和預期的結果一致,注意一下如果Callable
執行丟擲異常,異常被包裝為ExecutionException
,要呼叫Throwable#getCause()
才能得到原始的異常例項。
小結
本文通過了解ThreadPoolExecutor
和Future
的實現原理做簡單的擴充套件,使得非同步提交任務變得更加優雅和簡便。強化了動手能力的同時,也能加深對併發程式設計的一些認知。當然,本文只是提供一個十分簡陋的實現,筆者其實還想到了如對回撥處理的耗時做監控、回撥打上分組標籤執行等等更完善的功能,等到有需要的場景再進行實現。
這裡記錄一下過程中的一些領悟:
Executor#execute()
是執行緒池的核心介面,所有其他功能都是基於此介面做擴充套件,它的設計本身是無狀態的。- 靈活使用介面卡模式,可以在不改變已釋出的介面的功能同時實現新的介面的功能適配。
- 要善於發掘和使用JDK類庫設計者留給開發者的擴充套件介面。
個人部落格
- Throwable's Blog
(本文完 c-1-d e-a-20190702