Future設計模式及ExecutorService中的submit提交
Future設計模式提供一種非同步執行方式。提供這樣一個場景,我們進行一個查詢呼叫,該呼叫耗時時間長,因此當前執行緒不得不一直阻塞,直到拿到結果。這種情況下,就可以使用非同步執行的方式,它會在進行呼叫時直接返回一個“憑證”,當前執行緒可以在後續某個時刻通過該“憑證”拿到查詢結果。
一、Future設計模式實現
圖1.1 Future設計模式UML圖
1.1 介面定義
為實現非同步執行,我們需要將任務,任務結果解耦合。這至少需要三個部分,簡單地,我們定義三個介面Future、Callable和FutureService來完成這一過程。
1.1.1 Future介面
Future提供獲取執行結果和判斷任務是否完成的兩個介面,實際上它的實現類是對執行結果的一種封裝。
1 public interface Future<T> { 2 3 //返回計算結果 4 T get() throws InterruptedException; 5 6 //任務是否已經執行完成 7 void finished(T result); 8 9 }Future
1.1.2 Callable介面
Callable主要提供給呼叫者實現計算邏輯,並返回最終的計算結果。該介面實際作用與Runnable類似,都是為那些其例項可能被另一個執行緒執行的類設計,是對任務的一種封裝。但是Runnable不會返回結果,並且無法丟擲經過檢查的異常。
1 public interface Callable<T> { 2 3 T call() throws Exception; 4 5 } 6Callable
1.1.3 FutureService介面
FutureService主要用於提交任務,它對Future和Callable進行耦合,將執行結果賦予Future,否則任務和結果就無法產生聯絡。這裡提供兩種方式,一種是Callable,另一種是Runnable提交,實際上區別不大。
1 public interface FutureService<T> { 2FutureService3 //提交Runnable任務,則Future.get返回null 4 Future<?> submit(Runnable runnable); 5 6 Future<T> submit(Callable<T> callable); 7 8 }
1.2 程式實現
1.2.1 FutureIml
FutureIml實現了接收任務執行結果的方法和任務結果提取的方法,在get結果時,若任務還未執行結束,該方法陷入阻塞(可比較producer/consumer的實現)。
1 public class FutureIml<T> implements Future<T> { 2 3 private T result; 4 5 private boolean isDone = false; 6 7 private ReentrantLock lock = new ReentrantLock(); 8 9 private Condition condition = lock.newCondition(); 10 11 @Override 12 public T get() throws InterruptedException { 13 lock.lock(); 14 try{ 15 while (!isDone){ 16 condition.await(); 17 } 18 return result; 19 }finally { 20 lock.unlock(); 21 } 22 } 23 24 @Override 25 public void finished(T result) { 26 lock.lock(); 27 try { 28 this.result = result; 29 this.isDone = true; 30 condition.signalAll(); 31 } finally { 32 lock.unlock(); 33 } 34 } 35 }FutureIml
1.2.2 FutureServiceIml
FutureServiceIml實現了對任務和結果的聯絡,可以看到,這種非同步的實現是基於多執行緒的,另起一個新執行緒執行任務,並在任務結束時將結果傳給Future。
1 public class FutureServiceIml<T> implements FutureService<T>{ 2 3 private final static String PRIFIX = "FUTURE-"; 4 5 private AtomicInteger integer = new AtomicInteger(); 6 7 private String getName(){ 8 return PRIFIX+integer.getAndIncrement(); 9 } 10 @Override 11 public Future<?> submit(Runnable runnable) { 12 final FutureIml<Void> future = new FutureIml<>(); 13 new Thread(()->{ 14 runnable.run(); 15 future.finished(null); 16 }, getName()).start(); 17 return future; 18 } 19 20 @Override 21 public Future<T> submit(Callable<T> callable) { 22 final FutureIml<T> future = new FutureIml<>(); 23 new Thread(()->{ 24 try { 25 T result = callable.call(); 26 future.finished(result); 27 } catch (Exception e) { 28 e.printStackTrace(); 29 } 30 }, getName()).start(); 31 return future; 32 } 33 }FutureServiceIml
測試結果:
1 public class FutureTest { 2 public static void main(String[] args) throws InterruptedException { 3 FutureService service = new FutureServiceIml(); 4 Future future = service.submit(new Callable<String>() { 5 @Override 6 public String call() throws Exception { 7 return getString(); 8 } 9 }); 10 11 System.out.println("Not blocked"); 12 13 System.out.println(future.get()); 14 } 15 16 public static String getString() throws InterruptedException { 17 TimeUnit.SECONDS.sleep(3); 18 return "FINISHED"; 19 } 20 }FutureTest
1.2.3 增加回調機制
Future很明視訊記憶體在一個弊端,即get方法會阻塞。這就會造成當呼叫submit後,立刻呼叫get,則該阻塞與不使用Future模式的情況幾乎相同了。這裡的解決辦法是回撥機制,它可以讓呼叫者不再進行顯示地通過get方式獲得資料而導致阻塞。
Future<T> submit(Callable<T> callable, Callback<T> callback);
我們將回調介面在任務提交時作為引數注入,這個Callback(JDK1.8)主要用來接受並處理任務的計算結果,當提交的任務執行完成之後,會將結果傳遞給Callback介面進行進一步的執行,這樣在提交任務之後不再會因為通過get方法獲得結果而陷入阻塞。
二、ExexecutorService中的submit
和們實現的簡單的Future模式一樣,ExecutorService中也包含三個部分,並且它是線上程池併發框架下應用的,因此還對Runnable介面進行了適配。
2.1 Callable與Future
Callable對任務進行了封裝,但Future提供了更豐富的細節,除了可以從中拿到執行結果外,還可以實現對任務的控制部分控制(取消任務)。
1 public interface Callable<V> { 2 V call() throws Exception; 3 } 4 5 public interface Future<V> { 6 7 boolean cancel(boolean mayInterruptIfRunning); 8 9 boolean isCancelled(); 10 11 boolean isDone(); 12 13 V get() throws InterruptedException, ExecutionException; 14 15 V get(long timeout, TimeUnit unit) 16 throws InterruptedException, ExecutionException, TimeoutException; 17 }Callable And Future
2.2 介面卡
Callable和Runnable類似,是為那些其例項可能被另一個執行緒執行的類設計,是對任務的封裝。而執行緒只能接收Runnable任務,線上程池中最終的任務提交介面,execute方法只接收Runnable任務,因此我們需要兼顧Runnable、Callable和Future。
在我們自己的實現中,Callable被放在run方法下執行。線上程池中提供了更嚴謹的設計,它使用了兩個介面卡RunnableFuture和RunnableAdapter。
1 public interface RunnableFuture<V> extends Runnable, Future<V> { 2 void run(); 3 }RunnableFuture
RunnableFuture是對Runnable介面和Future介面的適配,表示可以被控制狀態的Runnable。通過實現這個介面,使得非同步任務也可以通過excute方法提交。
1 static final class RunnableAdapter<T> implements Callable<T> { 2 final Runnable task; 3 final T result; 4 RunnableAdapter(Runnable task, T result) { 5 this.task = task; 6 this.result = result; 7 } 8 public T call() { 9 task.run(); 10 return result; 11 } 12 }RunnableAdapter
RunnableAdapter是對Runnable和Callalbe的適配,實現了Callable介面,並聚合了Runnable物件。這個實現的主要功能是將Runnable轉化為可以非同步執行的任務,但這種方式Runnable也並不會給出任務執行結果,result值是使用者傳入的。(這裡將Callable與Runnable進行適配,當然不用適配也可以做到非同步執行,但是需要新的類實現該功能,因此很沒必要)。
2.3 FutureTask
我們已經瞭解了Callable、Future以及介面卡,現在需要一個類似FutureService,對任務與結果進行整理,這就是FutureTask的主要工作。FutureTask實現了RunnableFuture介面,當execute方法提交該非同步任務時,最終執行的是FutureTask類中的run方法。
1 public void run() { 2 if (state != NEW || 3 !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) 4 return; 5 try { 6 Callable<V> c = callable; 7 if (c != null && state == NEW) { 8 V result; 9 boolean ran; 10 try { 11 result = c.call(); 12 ran = true; 13 } catch (Throwable ex) { 14 result = null; 15 ran = false; 16 setException(ex); 17 } 18 if (ran) 19 set(result); 20 } 21 } finally { 22 //在狀態設定之前,runner必須為非null,以防止對run()的併發呼叫 23 runner = null; 24 25 //為防止洩漏中斷,必須在清空執行程式後重新讀取狀態 26 int s = state; 27 if (s >= INTERRUPTING) 28 handlePossibleCancellationInterrupt(s); 29 } 30 }run
run方法中的工作與我們自實現的方法類似,主要是在run方法中執行call方法,並把執行結果交給Future。除此之外FutureTask還有一些需要關注的點。
1 private Callable<V> callable; 2 private Object outcome; 3 private volatile Thread runner; 4 private volatile WaitNode waiters;field
這四個域分別是任務,結果,執行執行緒和一個連結串列。這個WaitNode是用於在多個執行緒get結果被阻塞時,用於儲存這些阻塞的執行緒,類中還有具體的阻塞喚醒方法,不做贅述。
2.4 整體執行過程
整體執行過程從ExecutorService的submit方法看,它將要執行的非同步任務進一步封裝進實現了Runnable介面的FutureTask中,使之能被excute提交。通過excete提交後將Future返回,通過Future拿取執行結果。
使用執行緒池本身就是非同步的,但是Runnable介面不能返回結果,通過Future設計模式,也就曲線救國了。
1 public <T> Future<T> submit(Callable<T> task) { 2 if (task == null) throw new NullPointerException(); 3 RunnableFuture<T> ftask = newTaskFor(task); 4 execute(ftask); 5 return ftask; 6 } 7 8 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 9 return new FutureTask<T>(callable); 10 } 11 12 public FutureTask(Callable<V> callable) { 13 if (callable == null) 14 throw new NullPointerException(); 15 this.callable = callable; 16 this.state = NEW; 17 }submit