Callable接口、Runable接口、Future接口
1. Callable與Runable區別
Java從發布的第一個版本開始就可以很方便地編寫多線程的應用程序,並在設計中引入異步處理。Thread類、Runnable接口和Java內存管理模型使得多線程編程簡單直接。
但Thread類和Runnable接口都不允許聲明檢查型異常,也不能定義返回值。沒有返回值這點稍微有點麻煩。不能聲明拋出檢查型異常則更麻煩一些。
public void run()方法契約意味著你必須捕獲並處理檢查型異常。即使你小心地保存了異常信息(在捕獲異常時)以便稍後檢查,但也不能保證這個類(Runnable對象)的所有使用者都讀取異常信息。
你也可以修改Runnable實現的getter,讓它們都能拋出任務執行中的異常。但這種方法除了繁瑣也不是十分安全可靠,你不能強迫使用者調用這些方法,程序員很可能會調用join()方法等待線程結束然後就不管了。
但是現在不用擔心了,以上的問題終於在1.5中解決了。Callable接口和Future接口的引入以及他們對線程池的支持優雅地解決了這兩個問題。
不管用哪種方式創建線程,其本質都是Callable接口與Runable接口。兩者都是可被其它線程執行的任務!!區別是:
(1)Callable規定的方法是call(),而Runnable規定的方法是run()。 (2)Callable的任務執行後可返回值,而Runnable的任務是不能返回值的。 (3)call()方法可拋出異常,而run()方法是不能拋出異常的。 (4)運行Callable任務可拿到一個Future對象。
2.Future
如上所說,Callable任務返回Future對象。即:Callable和Future一個產生結果,一個拿到結果。
Future 表示異步計算的結果。Future接口中有如下方法:
- boolean cancel(boolean mayInterruptIfRunning)
取消任務的執行。參數指定是否立即中斷任務執行,或者等等任務結束
- boolean isCancelled()
任務是否已經取消,任務正常完成前將其取消,則返回 true
- boolean isDone()
任務是否已經完成。需要註意的是如果任務正常終止、異常或取消,都將返回true
- V get
等待任務執行結束,然後獲得V類型的結果。InterruptedException 線程被中斷異常, ExecutionException任務執行異常,如果任務被取消,還會拋出CancellationException
- V get(long timeout, TimeUnit unit)
同上面的get功能一樣,多了設置超時時間。參數timeout指定超時時間,uint指定時間的單位,在枚舉類TimeUnit中有相關的定義。如果計算超時,將拋出TimeoutException
Future接口提供方法來檢測任務是否被執行完,等待任務執行完獲得結果。也可以設置任務執行的超時時間,這個設置超時的方法就是實現Java程序執行超時的關鍵。
所以,如果需要設定代碼執行的最長時間,即超時,可以用Java線程池ExecutorService類配合Future接口來實現。
三個簡單的小例子,體會一下:
package com.zyf.Future; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class FutureGetTimeOut1 { public static void main(String[] args){ int timeout = 2; ExecutorService executor = Executors.newSingleThreadExecutor(); Boolean result = false; Future<Boolean> future = executor.submit(new TaskThread("發送請求"));//將任務提交給線程池 try { result = future.get(timeout, TimeUnit.SECONDS); // result = future.get(timeout, TimeUnit.MILLISECONDS); //1 System.out.println("發送請求任務的返回結果: "+result); //2 } catch (InterruptedException e) { System.out.println("線程中斷出錯。"); future.cancel(true);// 中斷執行此任務的線程 } catch (ExecutionException e) { System.out.println("線程服務出錯。"); future.cancel(true); } catch (TimeoutException e) {// 超時異常 System.out.println("超時。"); future.cancel(true); }finally{ System.out.println("線程服務關閉。"); executor.shutdown(); } } static class TaskThread implements Callable<Boolean> { private String t; public TaskThread(String temp){ this.t= temp; } public Boolean call() { //for用於模擬超時 for(int i=0;i<999999999;i++){ if(i==999999998){ System.out.println(t+"成功!"); } if (Thread.interrupted()){ //很重要 return false; } } System.out.println("繼續執行.........."); return true; } } }
package com.zyf.Future; import java.util.concurrent.*; public class FutureGetTimeOut2 { public static void main(String[] args) { final ExecutorService service = Executors.newFixedThreadPool(1); TaskThread taskThread = new TaskThread(); System.out.println("提交任務...begin"); Future<Object> taskFuture = service.submit(taskThread); System.out.println("提交任務...end"); try { Object re = taskFuture.get(60000, TimeUnit.MILLISECONDS);// 超時設置 System.out.println(re); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { System.out.println("超時 取消任務"); taskFuture.cancel(true); System.out.println("超時 取消任務OK"); } finally { service.shutdown(); } } } class TaskThread implements Callable<Object> { public Object call() throws Exception { String result = "空結果"; try { System.out.println("任務開始...."); //修改sleep 的值測試超時 Thread.sleep(500); result = "正確結果"; System.out.println("任務結束...."); } catch (Exception e) { System.out.println("Task is interrupted!"); } return result; } }
package com.zyf.Future; import java.util.concurrent.*; class MyCallable implements Callable<Object> { private int flag = 0; public MyCallable(int flag) { this.flag = flag; } public String call() throws Exception { if (this.flag == 0) { return "flag = 0"; } if (this.flag == 1) { try { while (true) { System.out.println("looping."); Thread.sleep(2000); } } catch (InterruptedException e) { System.out.println("Interrupted"); } return "false"; } else { throw new Exception("Bad flag value!"); } } } public class FutureGetBlock { public static void main(String[] args) { // 定義3個Callable類型的任務 MyCallable task1 = new MyCallable(0); MyCallable task2 = new MyCallable(1); MyCallable task3 = new MyCallable(2); // 創建一個執行任務的服務 ExecutorService es = Executors.newFixedThreadPool(3); try { // 提交並執行任務,任務啟動時返回了一個Future對象, // 如果想得到任務執行的結果或者是異常可對這個Future對象進行操作 Future<?> future1 = es.submit(task1); // 獲得第一個任務的結果,如果調用get方法,當前線程會等待任務執行完畢後才往下執行 System.out.println("task1: " + future1.get()); Future<?> future2 = es.submit(task2); // 等待5秒後,再停止第二個任務。因為第二個任務進行的是無限循環 Thread.sleep(5000); System.out.println("task2 cancel: " + future2.cancel(true)); // 獲取第三個任務的輸出,因為執行第三個任務會引起異常 // 所以下面的語句將引起異常的拋出 Future<?> future3 = es.submit(task3); System.out.println("task3: " + future3.get()); } catch (Exception e) { System.out.println(e.toString()); } // 停止任務執行服務 es.shutdownNow(); } }
3. Future實現類
3.1 FutureTask
FutureTask是一個RunnableFuture<V>,而RunnableFuture實現了Runnbale又實現了Futrue<V>這兩個接口,
public class FutureTask<V> implements RunnableFuture<V> public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
另外它還可以包裝Runnable和Callable<V>
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
可以看到,Runnable會被Executors.callable()函數轉換為Callable類型,即FutureTask最終都是執行Callable類型的任務。該適配函數的實現如下 :
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } /** * A callable that runs given task and returns given result */ static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
由於FutureTask實現了Runnable,因此它既可以通過Thread包裝來直接執行,也可以提交給ExecuteService來執行。見下面兩個例子:
package com.zyf.Future; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class FutureTaskDemo { public static void main(String[] args) { Callable<Integer> callable = new Callable<Integer>() { public Integer call() throws Exception { return new Random().nextInt(100); } }; FutureTask<Integer> future = new FutureTask<Integer>(callable); new Thread(future).start(); try { Thread.sleep(1000);// 可能做一些事情 int result = future.get(); System.out.println(result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
package com.zyf.Future; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; public class FutureTaskDemo2 { static ExecutorService mExecutor = Executors.newSingleThreadExecutor(); public static void main(String[] args) { futureDemo(); } static void futureDemo() { try { /** * 提交runnable則沒有返回值, future沒有數據 */ Future<?> future = mExecutor.submit(new Runnable() { @Override public void run() { fibc(20); } }); System.out.println("future result from runnable : " + future.get()); /** * 提交Callable, 有返回值, future中能夠獲取返回值 */ Future<Integer> result2 = mExecutor.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { return fibc(20); } }); System.out.println("future result from callable : " + result2.get()); /** * FutureTask則是一個RunnableFuture<V>,即實現了Runnbale又實現了Futrue<V>這兩個接口, * 另外它還可以包裝Runnable(實際上會轉換為Callable)和Callable * <V>,所以一般來講是一個符合體了,它可以通過Thread包裝來直接執行,也可以提交給ExecuteService來執行 * ,並且還可以通過v get()返回執行結果,在線程體沒有執行完成的時候,主線程一直阻塞等待,執行完則直接返回結果。 */ FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { return fibc(20); } }); // 提交futureTask mExecutor.submit(futureTask); System.out.println("future result from futureTask : " + futureTask.get()); mExecutor.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } /** * 效率底下的斐波那契數列, 耗時的操作 * * @param num * @return */ static int fibc(int num) { if (num == 0) { return 0; } if (num == 1) { return 1; } return fibc(num - 1) + fibc(num - 2); } }
如果要執行多個帶返回值的任務,並取得多個返回值,兩種方法:
1.先創建一個裝Future類型的集合,用Executor提交的任務返回值添加到集合中,最後便利集合取出數據。
這時候,submit的task不一定是按照加入自己維護的list順序完成的。從list中遍歷的每個Future對象並不一定處於完成狀態,這時調用get()方法就會被阻塞住。
如果系統是設計成每個線程完成後就能根據其結果繼續做後面的事,這樣對於處於list後面的但是先完成的線程就會增加了額外的等待時間。
所以jdk1.8增加了Future接口的另外一個實現類CompletionService
2.CompletionService相當於Executor加上BlockingQueue,使用場景為當子線程並發了一系列的任務以後,主線程需要實時地取回子線程任務的返回值並同時順序地處理這些返回值,誰先返回就先處理誰。
而CompletionService的實現是維護一個保存Future對象的BlockingQueue。只有當這個Future對象狀態是結束的時候,才會加入到這個Queue中,take()方法其實就是Producer-Consumer中的Consumer。它會從Queue中取出Future對象,如果Queue是空的,就會阻塞在那裏,直到有完成的Future對象加入到Queue中。
所以,先完成的必定先被取出。這樣就減少了不必要的等待時間。
Callable接口、Runable接口、Future接口