解開Future的神祕面紗之獲取結果
前言
在前面的兩篇博文中,已經介紹利用FutureTask任務的執行流程,以及利用其實現的cancel方法取消任務的情況。本篇就來介紹下,執行緒任務的結果獲取。
利用get方法獲取程式執行結果
我們知道利用Future介面的最重要的操作就是要獲取任務的結果,而此操作對應的方法就是get。但是問題來了,如果我呼叫get方法的時候,任務還沒有完成呢?答案就是,等它完成,當前執行緒將被阻塞,直到任務完成(注意,這裡說的完成,指的是任務結束,因為異常而結束也算),get方法返回。主執行緒(不是執行任務的執行緒)才被喚醒,然後繼續執行。
靈活的get方法
有人可能會問,如果我呼叫get方法的時候,任務離完成還需要很長時間,那麼我主執行緒不是會浪費一些時間?是的,如果主執行緒比較忙的話,這樣確實主執行緒的效率。不過還有一個有參的get方法,此方法以等待時長為引數,如果時長結束,任務還沒完成,主執行緒將繼續執行,然後會在之後的某個時間再來獲取任務結果。(當然如果主執行緒依賴這個任務結果才能繼續執行,那麼只能老老實實地等了
FutureTask的阻塞模型
要想了解get方法的具體實現,必須先弄清楚,它是如何阻塞的。前篇博文已經提到,FutureTask有型別為WaitNode欄位waiters,實際上這個waiters引用的是一個以WaitNode為節點的單向連結串列的頭節點。如圖所示:
waitNode類程式碼如下:
static final class WaitNode { volatile Thread thread; //執行緒 volatile WaitNode next; //下一個節點 //建構函式獲取當前執行執行緒的引用 WaitNode() { thread = Thread.currentThread(); } }
WaitNode保留執行緒引用的作用是什麼?
答案是用於任務完成後喚醒等待執行緒。當FutureTask執行完callable的run方法後,將執行finishCompletion方法通知所有等待執行緒
private void finishCompletion() { //遍歷等待節點 for (WaitNode q; (q = waiters) != null;) { //將FutureTask的waiters引用置null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { //喚醒所有等待執行緒 for (;;) { //取出節點對應的執行緒 Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); //喚醒對應執行緒 } //獲取下一個節點 WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //呼叫鉤子函式done,此為空方法,子類可根據需求進行實現 done(); callable = null; }
執行緒的阻塞方式——park和unPark
park/unPark也是用於控制執行緒等待狀態的。我們熟悉的,用於控制執行緒等待狀態的還有wait/notify。wait/notify是某個物件的條件佇列,要阻塞執行緒,或者說要加入等待佇列,必須先獲取物件的鎖。
與wait()/notify不同的是,park和unpark直接操作執行緒,無需獲取物件的鎖,個人認為這是這裡使用park/unPark,而不是wait/notifyAll的原因,因為獲取鎖需要額外的開銷。
get方法的具體實現
以下是FutureTask中get方法的實現
public V get() throws InterruptedException, ExecutionException { //獲取當前任務狀態 int s = state; //如果是NEW或者COMPLETING,也就是還沒有結束,就呼叫awaitDone進行阻塞 if (s <= COMPLETING) s = awaitDone(false, 0L); //注意,這裡的引數,表示非超時等待,如果任務未結束,程式將一直卡在這裡 //如果awaitDone返回,也就是任務已經結束,根據任務狀態,返回結果 return report(s); }
以下是get方法中呼叫到的awaitDone的實現
private int awaitDone(boolean timed, long nanos) throws InterruptedException { //根據超時時間,計算結束時間點 final long deadline = timed ? System.nanoTime() + nanos : 0L; //等待節點 WaitNode q = null; //是否加入等待佇列 boolean queued = false; //這裡並不是通過自旋,使方法無法返回。而是利用自旋CAS, 改變狀態。如果成功,一次就夠了 for (;;) { //如果此執行緒被中斷,把從節點從等待佇列中移除 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; //如果狀態大於COMPLETING,也就是任務已結束,返回任務狀態 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); //第一次迴圈,q是null,建立節點 else if (q == null) q = new WaitNode(); //如果還未加入等待佇列,就加入 else if (!queued) //q.next = waiters 表示式的返回值 是左側的值,也就是waiters //意思是,如果當前物件的waiters的值是waiters, 就將他賦值為q queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //如果是超時等待,則呼叫parkNanos, 執行緒將在指定時間後被喚醒 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } //如果不是超時等待,且已經加入等待佇列,這時候利用park將當前執行緒掛起 else LockSupport.park(this); } }
很多人可能會覺得這個迴圈體,看著有點迷糊,我剛開始也看得頭大。但是我們可以根據幾種情境,來檢視這幾種情境下程式碼的執行情況。
注:第二個for迴圈內,第二個if-else塊是一個大塊,每次只執行一個。
幾種執行情境
一、當前執行緒成功加入等待佇列,且被阻塞,一段時間後任務完成,執行緒被喚醒
二、當前執行緒加入佇列後,還沒被阻塞,任務就已經完成了
三、因為其他執行緒加入等待佇列的影響,當前執行緒未能加入等待佇列
這裡說明一下,如果其他執行緒在此執行緒之前,比較接近的時間,加入了等待佇列,由於記憶體可見性的原因,當前執行緒看到的waiters值沒有及時改變,故與其實際值不同,CAS操作就將失敗。
為什麼一定要CAS成功?答案是,如果不成功,出現執行緒安全問題,連結串列的結構就會一塌糊塗。這裡不細談。
根據任務狀態獲取結果
我們已經知道,FutureTask有一個Object欄位的outcome,也就是任務執行的結果。當任務完成後,會將結果賦值給它。以下是FutureTask的run方法:
public void run() { //任務開始執行後,設定FutureTask的runner欄位,指明執行它的執行緒 if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread())) return; try { //獲取具體任務 Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; //任務是否已被執行完 try { //執行任務 result = c.call(); ran = true; } catch (Throwable ex) { result = null; //如果執行任務過程中出現異常,則ran=false 表示沒有執行完成 ran = false; //設定異常 => 將任務狀態設定為異常,並將異常資訊賦值給outcome, 也就是任務結果 //這個方法會呼叫finishCompletion setException(ex); } //如果執行完成,把結果賦值給outcome if (ran) set(result); //這個方法會呼叫finishCompletion } } finally { //既然執行緒已經"完成"當前任務,就放棄引用,防止影響它執行其他任務 runner = null; //重新獲取任務狀態 int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
由前文可知,當任務"完成"的時候,獲取結果的執行緒將被喚醒。回到get方法,它將獲取到任務的狀態,並根據任務狀態獲取結果。也就是report方法:
private V report(int s) throws ExecutionException { //獲取結果 Object x = outcome; //如果任務正常完成 if (s == NORMAL) //強制轉換為對應型別並返回 return (V)x; //如果任務狀態為CANCELLED、INTERRUPTING、INTERRUPTED表明是通過cacel方法取消了 //返回已取消異常 if (s >= CANCELLED) throw new CancellationException(); //如果是因為異常中斷的話,丟擲具體異常資訊 throw new ExecutionException((Throwable)x); }