1. 程式人生 > 實用技巧 >Future設計模式及ExecutorService中的submit提交

Future設計模式及ExecutorService中的submit提交

Future設計模式提供一種非同步執行方式。提供這樣一個場景,我們進行一個查詢呼叫,該呼叫耗時時間長,因此當前執行緒不得不一直阻塞,直到拿到結果。這種情況下,就可以使用非同步執行的方式,它會在進行呼叫時直接返回一個“憑證”,當前執行緒可以在後續某個時刻通過該“憑證”拿到查詢結果。

一、Future設計模式實現

圖1.1 Future設計模式UML圖

1.1 介面定義

為實現非同步執行,我們需要將任務,任務結果解耦合。這至少需要三個部分,簡單地,我們定義三個介面Future、Callable和FutureService來完成這一過程。

1.1.1 Future介面

Future提供獲取執行結果和判斷任務是否完成的兩個介面,實際上它的實現類是對執行結果的一種封裝。

  1 public interface Future<T> {
  2 
  3     //返回計算結果
  4     T get() throws InterruptedException;
  5 
  6     //任務是否已經執行完成
  7     void finished(T result);
  8 
  9 }
Future

1.1.2 Callable介面

Callable主要提供給呼叫者實現計算邏輯,並返回最終的計算結果。該介面實際作用與Runnable類似,都是為那些其例項可能被另一個執行緒執行的類設計,是對任務的一種封裝。但是Runnable不會返回結果,並且無法丟擲經過檢查的異常。

  1 public interface Callable<T> {
  2 
  3     T call() throws Exception;
  4 
  5 }
  6 
Callable

1.1.3 FutureService介面

FutureService主要用於提交任務,它對Future和Callable進行耦合,將執行結果賦予Future,否則任務和結果就無法產生聯絡。這裡提供兩種方式,一種是Callable,另一種是Runnable提交,實際上區別不大。

  1 public interface FutureService<T> {
  2
3 //提交Runnable任務,則Future.get返回null 4 Future<?> submit(Runnable runnable); 5 6 Future<T> submit(Callable<T> callable); 7 8 }
FutureService

1.2 程式實現

1.2.1 FutureIml

FutureIml實現了接收任務執行結果的方法和任務結果提取的方法,在get結果時,若任務還未執行結束,該方法陷入阻塞(可比較producer/consumer的實現)。

  1 public class FutureIml<T> implements Future<T> {
  2 
  3     private T result;
  4 
  5     private boolean isDone = false;
  6 
  7     private ReentrantLock lock = new ReentrantLock();
  8 
  9     private Condition condition = lock.newCondition();
 10 
 11     @Override
 12     public T get() throws InterruptedException {
 13         lock.lock();
 14         try{
 15             while (!isDone){
 16                 condition.await();
 17             }
 18             return result;
 19         }finally {
 20             lock.unlock();
 21         }
 22     }
 23 
 24     @Override
 25     public void finished(T result) {
 26         lock.lock();
 27         try {
 28             this.result = result;
 29             this.isDone = true;
 30             condition.signalAll();
 31         } finally {
 32             lock.unlock();
 33         }
 34     }
 35 }
FutureIml

1.2.2 FutureServiceIml

FutureServiceIml實現了對任務和結果的聯絡,可以看到,這種非同步的實現是基於多執行緒的,另起一個新執行緒執行任務,並在任務結束時將結果傳給Future。

  1 public class FutureServiceIml<T> implements FutureService<T>{
  2 
  3     private final static String PRIFIX = "FUTURE-";
  4 
  5     private AtomicInteger integer = new AtomicInteger();
  6 
  7     private String getName(){
  8         return PRIFIX+integer.getAndIncrement();
  9     }
 10     @Override
 11     public Future<?> submit(Runnable runnable) {
 12         final FutureIml<Void> future = new FutureIml<>();
 13         new Thread(()->{
 14             runnable.run();
 15             future.finished(null);
 16         }, getName()).start();
 17         return future;
 18     }
 19 
 20     @Override
 21     public Future<T> submit(Callable<T> callable) {
 22         final FutureIml<T> future = new FutureIml<>();
 23         new Thread(()->{
 24             try {
 25                 T result = callable.call();
 26                 future.finished(result);
 27             } catch (Exception e) {
 28                 e.printStackTrace();
 29             }
 30         }, getName()).start();
 31         return future;
 32     }
 33 }
FutureServiceIml

測試結果:

  1 public class FutureTest {
  2     public static void main(String[] args) throws InterruptedException {
  3         FutureService service = new FutureServiceIml();
  4         Future future = service.submit(new Callable<String>() {
  5             @Override
  6             public String call() throws Exception {
  7                 return getString();
  8             }
  9         });
 10 
 11         System.out.println("Not blocked");
 12 
 13         System.out.println(future.get());
 14     }
 15 
 16     public static String getString() throws InterruptedException {
 17         TimeUnit.SECONDS.sleep(3);
 18         return "FINISHED";
 19     }
 20 }
FutureTest

1.2.3 增加回調機制

Future很明視訊記憶體在一個弊端,即get方法會阻塞。這就會造成當呼叫submit後,立刻呼叫get,則該阻塞與不使用Future模式的情況幾乎相同了。這裡的解決辦法是回撥機制,它可以讓呼叫者不再進行顯示地通過get方式獲得資料而導致阻塞。

Future<T> submit(Callable<T> callable, Callback<T> callback);

我們將回調介面在任務提交時作為引數注入,這個Callback(JDK1.8)主要用來接受並處理任務的計算結果,當提交的任務執行完成之後,會將結果傳遞給Callback介面進行進一步的執行,這樣在提交任務之後不再會因為通過get方法獲得結果而陷入阻塞。

二、ExexecutorService中的submit

和們實現的簡單的Future模式一樣,ExecutorService中也包含三個部分,並且它是線上程池併發框架下應用的,因此還對Runnable介面進行了適配。

2.1 Callable與Future

Callable對任務進行了封裝,但Future提供了更豐富的細節,除了可以從中拿到執行結果外,還可以實現對任務的控制部分控制(取消任務)。

  1 public interface Callable<V> {
  2     V call() throws Exception;
  3 }
  4 
  5 public interface Future<V> {
  6 
  7     boolean cancel(boolean mayInterruptIfRunning);
  8 
  9     boolean isCancelled();
 10 
 11     boolean isDone();
 12 
 13     V get() throws InterruptedException, ExecutionException;
 14 
 15     V get(long timeout, TimeUnit unit)
 16         throws InterruptedException, ExecutionException, TimeoutException;
 17 }
Callable And Future

2.2 介面卡

Callable和Runnable類似,是為那些其例項可能被另一個執行緒執行的類設計,是對任務的封裝。而執行緒只能接收Runnable任務,線上程池中最終的任務提交介面,execute方法只接收Runnable任務,因此我們需要兼顧Runnable、Callable和Future。

在我們自己的實現中,Callable被放在run方法下執行。線上程池中提供了更嚴謹的設計,它使用了兩個介面卡RunnableFuture和RunnableAdapter。

  1 public interface RunnableFuture<V> extends Runnable, Future<V> {
  2     void run();
  3 }
RunnableFuture

RunnableFuture是對Runnable介面和Future介面的適配,表示可以被控制狀態的Runnable。通過實現這個介面,使得非同步任務也可以通過excute方法提交。

  1 static final class RunnableAdapter<T> implements Callable<T> {
  2 	final Runnable task;
  3 	final T result;
  4 	RunnableAdapter(Runnable task, T result) {
  5 		this.task = task;
  6 		this.result = result;
  7 	}
  8 	public T call() {
  9 		task.run();
 10 		return result;
 11 	}
 12 }
RunnableAdapter

RunnableAdapter是對Runnable和Callalbe的適配,實現了Callable介面,並聚合了Runnable物件。這個實現的主要功能是將Runnable轉化為可以非同步執行的任務,但這種方式Runnable也並不會給出任務執行結果,result值是使用者傳入的。(這裡將Callable與Runnable進行適配,當然不用適配也可以做到非同步執行,但是需要新的類實現該功能,因此很沒必要)。

2.3 FutureTask

我們已經瞭解了Callable、Future以及介面卡,現在需要一個類似FutureService,對任務與結果進行整理,這就是FutureTask的主要工作。FutureTask實現了RunnableFuture介面,當execute方法提交該非同步任務時,最終執行的是FutureTask類中的run方法。

  1 public void run() {
  2 	if (state != NEW ||
  3 	     !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
  4 	     return;
  5 	try {
  6 		Callable<V> c = callable;
  7 		if (c != null && state == NEW) {
  8 			V result;
  9 			boolean ran;
 10 			try {
 11 				result = c.call();
 12 				ran = true;
 13 			} catch (Throwable ex) {
 14 				result = null;
 15 				ran = false;
 16 				setException(ex);
 17 			}
 18 			if (ran)
 19 				set(result);
 20 		}
 21 	} finally {
 22 		//在狀態設定之前,runner必須為非null,以防止對run()的併發呼叫
 23 		runner = null;
 24 
 25 		//為防止洩漏中斷,必須在清空執行程式後重新讀取狀態
 26 		int s = state;
 27 		if (s >= INTERRUPTING)
 28 			handlePossibleCancellationInterrupt(s);
 29 	}
 30 }
run

run方法中的工作與我們自實現的方法類似,主要是在run方法中執行call方法,並把執行結果交給Future。除此之外FutureTask還有一些需要關注的點。

  1 private Callable<V> callable;
  2 private Object outcome;
  3 private volatile Thread runner;
  4 private volatile WaitNode waiters;
field

這四個域分別是任務,結果,執行執行緒和一個連結串列。這個WaitNode是用於在多個執行緒get結果被阻塞時,用於儲存這些阻塞的執行緒,類中還有具體的阻塞喚醒方法,不做贅述。

2.4 整體執行過程

整體執行過程從ExecutorService的submit方法看,它將要執行的非同步任務進一步封裝進實現了Runnable介面的FutureTask中,使之能被excute提交。通過excete提交後將Future返回,通過Future拿取執行結果。

使用執行緒池本身就是非同步的,但是Runnable介面不能返回結果,通過Future設計模式,也就曲線救國了。

  1 public <T> Future<T> submit(Callable<T> task) {
  2 	if (task == null) throw new NullPointerException();
  3 	RunnableFuture<T> ftask = newTaskFor(task);
  4 	execute(ftask);
  5 	return ftask;
  6 }
  7 
  8 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  9 	return new FutureTask<T>(callable);
 10 }
 11 
 12 public FutureTask(Callable<V> callable) {
 13 	if (callable == null)
 14 		throw new NullPointerException();
 15 	this.callable = callable;
 16 	this.state = NEW;
 17 }
submit