java非同步呼叫future、callable以及futuretask分析
Runnable與Callable
java中用於新建執行緒的有兩大介面:Runnable和Callable。其中,Runnable用的比較多,多用於不需要子執行緒返回結果的處理。
public interface Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run(); }
通過介面說明可以發現,Runnable介面有個run方法,返回是void。我們在run方法中實現子執行緒的邏輯。
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
而觀察Callable介面,可以發現其方法call可以返回V型別的返回值。那麼如何獲取實現Callable介面的子執行緒的返回值呢。答案是:通過執行緒池的submit方法和實現了callable介面的物件,可以返回一個future介面的物件,通過此物件可以非同步獲取callable方法的執行結果。(一般callable與執行緒池一起使用)
future.get() 阻塞式的返回結果,如果任務未執行完畢,就阻塞等待結果產生。
future.get(10000, TimeUnit.MILLISECONDS); 在時間範圍內得到結果,沒有的話就報錯
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();:判斷子執行緒是否已取消
boolean isDone();:判斷子執行緒是否已完成
public class FutureTest {
/**
* @param args
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException, ExecutionException {
// TODO Auto-generated method stub
//建立執行緒池
ExecutorService executorService=Executors.newCachedThreadPool();
//建立callable的任務
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
Thread.sleep(10000);
return "茶葉準備好啦";
}
};
//將callable任務放入執行緒池中
Future<String> submit = executorService.submit(callable);
//可以先做自己的事
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("把開水準備好");
//通過submit查詢結果
System.out.println("茶葉是否準備好?"+submit.isDone());
String string = submit.get();
System.out.println(string+"開水"+",可以泡茶啦");
//關閉執行緒池
executorService.shutdown();
}
}
上面的 例子,通過executorService.submit()提交一個callable的非同步任務(即假如讓外賣送茶葉來(需要花10s)),在提交任務後,主執行緒可以幹自己的事,即花了2s來燒開水。水燒開後,通過future介面的物件來查詢任務是否已經完成(即茶葉是否送到),或者阻塞式的等待結果。最後關閉執行緒池。
一般來說,Runnable介面可以通過new Thread().start來實現,而Callable是通過執行緒池來提交。那麼有沒有方法可以讓Callable介面通過new Thread().start來實現呢。答案就是下面的FutureTask來實現。
Futuretask
來看看javadoc的說明:
A cancellable asynchronous computation. This class provides a base implementation of Future, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation. The result can only be retrieved when the computation has completed; the get methods will block if the computation has not yet completed. Once the computation has completed, the computation cannot be restarted or cancelled (unless the computation is invoked using runAndReset).
A FutureTask can be used to wrap a Callable or Runnable object. Because FutureTask implements Runnable, a FutureTask can be submitted to an Executor for execution.
In addition to serving as a standalone class, this class provides protected functionality that may be useful when creating customized task classes.
Type Parameters:
<V> The result type returned by this FutureTask's get methods
Since:
1.5
Author:
Doug Lea
翻譯一下就是:可以取消的非同步(執行緒)計算。是介面future的具體實現,提供了啟動、取消、查詢子執行緒是否完成以及非同步(事後)查詢子執行緒的結果等方法。其中get方法將會阻塞直至等待子執行緒的方法完成。當子執行緒方法完成後,子執行緒就不能再次啟動或者取消。
public class FutureTask<V> implements RunnableFuture<V> {}
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
通過上面的程式碼可以看到,FutureTask實現了RunnableFuture介面,而RunnableFuture又繼承了Runnable和Future介面。因此FutureTask即可以作為Runnable的實現類,又可以獲取實現了Callable的介面的方法的返回結果。
再看看FutureTask的構造方法:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
FutureTask有兩個構造方法:一個是傳入實現了Callable的實現類,通過FutureTask可以非同步獲得這個實現類的計算結果。另一個是傳入實現了Runnable介面的實現類和基於泛型的返回引數。當Runnable的實現類的方法執行完畢後,就會返回這個泛型的引數。
其中 this.callable = Executors.callable(runnable, result);是將runnable的實現類轉為基於callable介面的實現類。通過檢視其實現可以證實:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result); //這裡的RunnableAdapter就是個介面卡
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
//重點在這裡, 其實call的實現就是run方法執行完後返回result。
public T call() {
task.run();
return result;
}
}
看到這裡,就可以明白上一章節說到callable的實現類也可以通過new Thread().start()來實現。因為FutureTask也是實現了Runnable介面的,因此它可以通過new Thread().start()來啟動,同時它的構造方法可以傳入callable的實現類,因此callable的實現類也可以通過new Thread().start()來實現。
public class FutureTaskTest2 {
/**
* @param args
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException, ExecutionException {
// TODO Auto-generated method stub
//實現callable介面的futuretask
FutureTask<String> task=new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
Thread.sleep(5000);
return "test";
}
});
Thread thread=new Thread(task);
thread.start();
System.out.println("main thread start");
Thread.sleep(2000);
System.out.println("task 是否完成"+task.isDone());
System.out.println("等待task 完成"+task.get());
}
}
不過,一般子執行緒的呼叫,都建議用執行緒池來實現。下面的例子是ExecutorService+FutureTask的非同步獲取子執行緒結果的例子。
public class FutureTaskTest {
/**
* @param args
* @throws ExecutionException
* @throws InterruptedException
* 例子裡FutureTask的構造方法傳入的是runnable,其實傳入callable的也是類似的
*/
public static void main(String[] args) throws InterruptedException, ExecutionException {
// TODO Auto-generated method stub
//建立執行緒池
ExecutorService executorService=Executors.newCachedThreadPool();
//建立runnable的任務
Runnable runnable = new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("茶葉準備好啦");
}
};
//將runnable任務放入執行緒池中
String value="執行成功";
FutureTask<String> task=new FutureTask<String>(runnable,value );
executorService.execute(task);
System.out.println("runnable執行狀態"+task.isDone());
String object = (String) task.get();
System.out.println("runnable執行狀態"+object);
//關閉執行緒池
executorService.shutdown();
}
}
FutureTask的原理實現
FutureTask是future的具體實現,重點關注其幾個方法。在分析方法之前,先看看其原始碼裡的一個變數
/**
The run state of this task, initially NEW. The run state transitions to a terminal state only in methods set, setException,
and cancel. During completion, state may take on transient values of COMPLETING (while outcome is being set) or
INTERRUPTING (only while interrupting the runner to satisfy a cancel(true)). Transitions from these intermediate to final
states use cheaper ordered/lazy writes because values are unique and cannot be further modified.
Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
* */
private volatile int state; //其有7種狀態。
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
state用於表示Futuretask裡的子執行緒執行狀態。
NEW:表示是個新的任務或者還沒被執行完的任務。這是初始狀態。
COMPLETING:任務已經執行完成或者執行任務的時候發生異常,但是任務執行結果或者異常原因還沒有儲存到outcome欄位(outcome欄位用來儲存任務執行結果,如果發生異常,則用來儲存異常原因)的時候,狀態會從NEW變更到COMPLETING。但是這個狀態會時間會比較短,屬於中間狀態。
NORMAL:任務已經執行完成並且任務執行結果已經儲存到outcome欄位,狀態會從COMPLETING轉換到NORMAL。這是一個最終態。
EXCEPTIONAL:任務執行發生異常並且異常原因已經儲存到outcome欄位中後,狀態會從COMPLETING轉換到EXCEPTIONAL。這是一個最終態。
CANCELLED:任務還沒開始執行或者已經開始執行但是還沒有執行完成的時候,使用者呼叫了cancel(false)方法取消任務且不中斷任務執行執行緒,這個時候狀態會從NEW轉化為CANCELLED狀態。這是一個最終態。
INTERRUPTING: 任務還沒開始執行或者已經執行但是還沒有執行完成的時候,使用者呼叫了cancel(true)方法取消任務並且要中斷任務執行執行緒但是還沒有中斷任務執行執行緒之前,狀態會從NEW轉化為INTERRUPTING。這是一箇中間狀態。
INTERRUPTED:呼叫interrupt()中斷任務執行執行緒之後狀態會從INTERRUPTING轉換到INTERRUPTED。這是一個最終態。
有一點需要注意的是,所有值大於COMPLETING的狀態都表示任務已經執行完成(任務正常執行完成,任務執行異常或者任務被取消)。(轉載自http://beautyboss.farbox.com/post/study/shen-ru-xue-xi-futuretask)
首先來看看run方法,這個方法是在FutureTask任務提交給執行緒池進行submit之後執行,是非同步執行緒執行的方法內容。
public void run() {
// 1. 狀態如果不是NEW,說明任務或者已經執行過,或者已經被取消,直接返回
// 2. 狀態如果是NEW,則嘗試把當前執行執行緒儲存在runner欄位中
// 如果賦值失敗則直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable; //得到callable,callable是通過構造方法傳遞進來的
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); //執行call方法的內容
ran = true;
} catch (Throwable ex) {
result = null; //如果發生異常,ran狀態就設為false ,比如此執行緒執行一半,被外部的submit.cancel(true)
//則此執行緒就被中斷,發生異常
ran = false;
setException(ex); //設定異常結果
}
if (ran)
set(result); //如果正常執行費結束,就設定結果 ①
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
/**
* Causes this future to report an {@link ExecutionException}
* with the given throwable as its cause, unless this future has
* already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon failure of the computation.
*
* @param t the cause of failure
*/
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
再看get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) //如果state<COMPLETING,說明非同步計算還沒結束
s = awaitDone(false, 0L); //等待非同步計算執行完畢
return report(s);
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING && //如果還未執行完畢,而且等待的時間超出了timeout,就報錯
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
//report根據state狀態返回結果還是異常
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
}
重點看看awaitDone方法。
//等待非同步計算結束或者因為time超出而停止
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//WaitNode是個有next指標的連結串列
WaitNode q = null;
boolean queued = false;
for (;;) { // cas裡經常用到的for死迴圈
if (Thread.interrupted()) { //這裡的Thread是指get方法所屬的thread,而不是task裡callable所指的thread
//如果呼叫get方法所屬的thread被中斷了,則將q移除出等待結果的連結串列
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) { //如果state狀態>COMPLETING,說明非同步計算有了結果。不用繼續等待了。
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield(); //同理,這裡的Thread也是指外面呼叫get方法所屬的thread。這裡是讓出cpu,允許futuretask
//裡的執行緒繼續完成最後的賦值。
else if (q == null) //如果q==null,就建立一個waitNode。waitNode裡的thread就是上述的Thread.yield 的thread
q = new WaitNode();
else if (!queued) //如果q沒有入佇列,則將q放入waiters的首部,並且next指向waiters。即每個最新的q都是放在
//連結串列的首位
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime(); //再次計算時間
if (nanos <= 0L) { //如果小於0,說明超時了,直接返回
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos); //否則將本執行緒(還是get方法所屬的那個執行緒掛起規定的時間)
}
else
LockSupport.park(this); //如果沒有時間限制,就掛起get方法所屬的那個執行緒
}
}
上面就是get方法阻塞等待非同步計算的結果的核心。從上述程式碼看到, LockSupport.park(this)操作將get所屬的執行緒掛起。那麼在哪兒給恢復呢。上上面程式碼的run()方法中的①的 set(result)方法。
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //先將state從new--》COMPLETING
outcome = v; //賦值
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state ,
finishCompletion(); //收尾工作
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t); //在這裡喚醒get所屬的執行緒 ②
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
總結:
FutureTask用於非同步獲取子執行緒計算的結果。如果task的計算任務沒完成,那麼任何外部執行緒通過task.get()方法都會阻塞,這些阻塞執行緒會形成一個waiter連結串列。知道task的run方法執行完畢,才會依次恢復這些阻塞執行緒,從而獲取到結果。
cancal(boolean mayInterruptIfRunning)方法
javadoc的解釋:
Attempts to cancel execution of this task. This attempt will fail if the task has already completed, has already been cancelled, or could not be cancelled for some other reason. If successful, and this task has not started when cancel is called, this task should never run. If the task has already started, then the mayInterruptIfRunning parameter determines whether the thread executing this task should be interrupted in an attempt to stop the task.
After this method returns, subsequent calls to isDone will always return true. Subsequent calls to isCancelled will always return true if this method returned true.
翻譯一下就是:這個方法用於嘗試去取消task任務。但是如果task任務已經完成或者已經取消情況下會失敗,而且有可能因為其他原因而不能取消。如果這個方法執行,而且task任務還未開始執行,那麼task任務會取消。如果task任務已經執行,那麼會根據mayInterruptIfRunning這個引數決定task是否繼續。即,如果是false,那麼task任務會繼續執行,同時外部呼叫task的cancel所在的執行緒呼叫get方法獲取結果會報異常。如果是true,那麼task任務會取消,同時外部呼叫task的cancel所在的執行緒呼叫get方法獲取結果會報異常。
看其原始碼
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW && //如果state狀態不是new,說明任務馬上結束,不能取消
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) { //否則,如果是true
try {
Thread t = runner; //這裡的runner是指callable所指的執行緒,即task任務的執行緒
if (t != null)
t.interrupt(); //取消執行緒
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
例子1: task任務已經開始,然後呼叫cancel(true),看結果
public class FutureTaskCancelTest1 {
/**
* @param args
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException, ExecutionException {
// TODO Auto-generated method stub
//建立執行緒池
ExecutorService executorService=Executors.newCachedThreadPool();
//建立runnable的任務
Callable<String> callable=new Callable<String>() {
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
System.out.println("task任務開始執行");
Thread.sleep(5000);
System.out.println("task任務結束執行");
return "計算完成";
}
};
FutureTask<String> task=new FutureTask<String>(callable);
executorService.execute(task);
Thread.sleep(3000);
System.out.println("cancel方法返回值是否成功:"+task.cancel(true));
System.out.println("cancel執行狀態"+task.isDone());
String object = (String) task.get();
System.out.println("獲取task任務的計算結果"+object);
//關閉執行緒池
executorService.shutdown();
}
}
結果
task任務開始執行
Exception in thread "main" cancel方法返回值是否成功:true
cancel執行狀態true
java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.andy.jcu.future.FutureTaskCancelTest1.main(FutureTaskCancelTest1.java:65)
task任務開始後過了3s呼叫cancel(true)方法,最後一句 System.out.println(“task任務結束執行”);沒有執行,說明task任務確實被取消了,同時task.get()報異常。
例子2: task任務已經開始,然後呼叫cancel(false),看結果
public static void main(String[] args) throws InterruptedException, ExecutionException {
// TODO Auto-generated method stub
//建立執行緒池
ExecutorService executorService=Executors.newCachedThreadPool();
//建立runnable的任務
Callable<String> callable=new Callable<String>() {
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
System.out.println("task任務開始執行");
Thread.sleep(5000);
System.out.println("task任務結束執行");
return "計算完成";
}
};
FutureTask<String> task=new FutureTask<String>(callable);
executorService.execute(task);
Thread.sleep(3000);
System.out.println("cancel方法返回值是否成功:"+task.cancel(false));
System.out.println("cancel執行狀態"+task.isDone());
String object = (String) task.get();
System.out.println("獲取task任務的計算結果"+object);
//關閉執行緒池
executorService.shutdown();
}
結果:
task任務開始執行
cancel方法返回值是否成功:true
cancel執行狀態true
Exception in thread "main" java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.andy.jcu.future.FutureTaskCancelTest1.main(FutureTaskCancelTest1.java:65)
task任務結束執行
如果任務已經執行,那麼呼叫cancel(false),其task任務會繼續執行完成。
例子2: task還沒有開始,然後呼叫cancel(false),看結果
public static void main(String[] args) throws InterruptedException, ExecutionException {
// TODO Auto-generated method stub
//建立執行緒池
ExecutorService executorService=Executors.newCachedThreadPool();
//建立runnable的任務
Callable<String> callable=new Callable<String>() {
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
System.out.println("task任務開始執行");
Thread.sleep(5000);
System.out.println("task任務結束執行");
return "計算完成";
}
};
FutureTask<String> task=new FutureTask<String>(callable);
System.out.println("cancel方法返回值是否成功:"+task.cancel(false));
executorService.execute(task);
System.out.println("task是否執行完成:"+task.isDone());
//
String object = (String) task.get();
System.out.println("獲取task任務的計算結果"+object);
//關閉執行緒池
executorService.shutdown();
}
}
結果:
cancel方法返回值是否成功:true
task是否執行完成:true
Exception in thread "main" java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.andy.jcu.future.FutureTaskCancelTest1.main(FutureTaskCancelTest1.java:66)
在task任務還沒開始前,呼叫cancel,會將任務取消。
那什麼時候cancel返回false呢?
只有任務結束後,再取呼叫cancel,會返回false。