Future類使用以及原理
1、Future Callable FutureTask 源碼說明
JDK內置的Future主要使用到了Callable接口和FutureTask類。
Callable是類似於Runnable的接口,實現Callable接口的類和實現Runnable的類都是可被其他線程執行的任務。Callable接口的定義如下:
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
Callable的類型參數是返回值的類型。例如:
Callable<Integer>表示一個最終返回Integer對象的異步計算。
Future保存異步計算的結果。實際應用中可以啟動一個計算,將Future對象交給某個線程,然後執行其他操作。Future對象的所有者在結果計算好之後就可以獲得它。Future接口具有下面的方法:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
get & get(long timeout, TimeUnit unit)
第一個get方法的調用被阻塞,直到計算完成。如果在計算完成之前,第二個get方法的調用超時,拋出一個TimeoutException異常。如果運行該計算的線程被中斷,兩個方法都將拋出InterruptedException。如果計算已經完成,那麽get方法立即返回。
isDone
如果計算還在進行,isDone方法返回false;如果完成了,則返回true。
cancel:
可以用cancel方法取消該計算。如果計算還沒有開始,它被取消且不再開始。如果計算處於運行之中,那麽如果mayInterrupt參數為true,它就被中斷。
true : 任務狀態= INTERRUPTING = 5。如果任務已經運行,則強行中斷。如果任務未運行,那麽則不會再運行
false:CANCELLED = 4。如果任務已經運行,則允許運行完成(但不能通過get獲取結果)。如果任務未運行,那麽則不會再運行
isCancelled
判斷是夠被取消;
FutureTask包裝器是一種非常便利的機制,同時實現了Future和Runnable接口。FutureTask有2個構造方法
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
下面具體分析下FutureTask的實現,先看JDK8的,再比較一下JDK6的實現。
既然FutureTask也是一個Runnable,那就看看它的run方法
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 這裏的callable是從構造方法裏面傳人的 if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); // 保存call方法拋出的異常 } if (ran) set(result); // 保存call方法的執行結果 } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
這裏表示狀態的屬性state是個什麽鬼
// Possible state transitions:
//1)執行過程順利完成:NEW -> COMPLETING -> NORMAL
//2)執行過程出現異常:NEW -> COMPLETING -> EXCEPTIONAL
//3)執行過程被取消:NEW -> CANCELLED
//4)執行過程中,線程中斷:NEW -> INTERRUPTING -> INTERRUPTED
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
get方法:
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
/** 這裏的if else的順序也是有講究的。
1.先判斷線程是否中斷,中斷則從隊列中移除(也可能該線程不存在於隊列中)
2.判斷當前任務是否執行完成,執行完成則不再阻塞,直接返回。
3.如果任務狀態=COMPLETING,證明該任務處於已執行完成,正在切換任務執行狀態,CPU讓出片刻即可
4.q==null,則證明還未創建節點,則創建節點
5.q節點入隊
6和7.阻塞 **/
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
可以看到get方法中使用了for循環,在任務沒有執行完成return之前,一直處於阻塞狀態,通過阻塞當前線程,直至run方法執行完成,state狀態變為大於 COMPLETING
下邊看一下report方法;
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
cancel方法:
mayInterruptIfRunning用來決定任務的狀態。 true : 任務狀態= INTERRUPTING = 5。如果任務已經運行,則強行中斷。如果任務未運行,那麽則不會再運行 false:CANCELLED = 4。如果任務已經運行,則允許運行完成(但不能通過get獲取結果)。如果任務未運行,那麽則不會再運行 **/ public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; if (mayInterruptIfRunning) { if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; //調用 Thread.interrupt()強行中斷 if (t != null) t.interrupt(); UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); //如果任務已經執行 則執行完成(但不能通過get獲取結果)
return true;
}
Future類使用以及原理