Java是如何實現Future模式的?萬字詳解!
JDK1.8原始碼分析專案(中文註釋)Github地址:
https://github.com/yuanmabiji/jdk1.8-sourcecode-blogs
1 Future是什麼?
先舉個例子,我們平時網購買東西,下單後會生成一個訂單號,然後商家會根據這個訂單號發貨,發貨後又有一個快遞單號,然後快遞公司就會根據這個快遞單號將網購東西快遞給我們。在這一過程中,這一系列的單號都是我們收貨的重要憑證。
因此,JDK的Future就類似於我們網購買東西的單號,當我們執行某一耗時的任務時,我們可以另起一個執行緒非同步去執行這個耗時的任務,同時我們可以乾點其他事情。當事情幹完後我們再根據future這個"單號"去提取耗時任務的執行結果即可。因此Future也是多執行緒中的一種應用模式。
擴充套件: 說起多執行緒,那麼Future又與Thread有什麼區別呢?最重要的區別就是Thread是沒有返回結果的,而Future模式是有返回結果的。
2 如何使用Future
前面搞明白了什麼是Future,下面我們再來舉個簡單的例子看看如何使用Future。
假如現在我們要打火鍋,首先我們要準備兩樣東西:把水燒開和準備食材。因為燒開水是一個比較漫長的過程(相當於耗時的業務邏輯),因此我們可以一邊燒開水(相當於另起一個執行緒),一邊準備火鍋食材(主執行緒),等兩者都準備好了我們就可以開始打火鍋了。
// DaHuoGuo.java
public class DaHuoGuo {
public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + ":" + "開始燒開水...");
// 模擬燒開水耗時
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + ":" + "開水已經燒好了...");
return "開水";
}
});
Thread thread = new Thread(futureTask);
thread.start();
// do other thing
System.out.println(Thread.currentThread().getName() + ":" + " 此時開啟了一個執行緒執行future的邏輯(燒開水),此時我們可以乾點別的事情(比如準備火鍋食材)...");
// 模擬準備火鍋食材耗時
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + ":" + "火鍋食材準備好了");
String shicai = "火鍋食材";
// 開水已經稍好,我們取得燒好的開水
String boilWater = futureTask.get();
System.out.println(Thread.currentThread().getName() + ":" + boilWater + "和" + shicai + "已經準備好,我們可以開始打火鍋啦");
}
}
執行結果如下截圖,符合我們的預期:
從以上程式碼中可以看到,我們使用Future主要有以下步驟:
- 新建一個
Callable
匿名函式實現類物件,我們的業務邏輯在Callable
的call
方法中實現,其中Callable的泛型是返回結果型別; - 然後把
Callable
匿名函式物件作為FutureTask
的構造引數傳入,構建一個futureTask
物件; - 然後再把
futureTask
物件作為Thread
構造引數傳入並開啟這個執行緒執行去執行業務邏輯; - 最後我們呼叫
futureTask
物件的get
方法得到業務邏輯執行結果。
可以看到跟Future使用有關的JDK類主要有FutureTask
和Callable
兩個,下面主要對FutureTask
進行原始碼分析。
擴充套件: 還有一種使用
Future
的方式是將Callable
實現類提交給執行緒池執行的方式,這裡不再介紹,自行百度即可。
3 FutureTask類結構分析
我們先來看下FutureTask
的類結構:
可以看到FutureTask
實現了RunnableFuture
介面,而RunnableFuture
介面又繼承了Future
和Runnable
介面。因為FutureTask
間接實現了Runnable
介面,因此可以作為任務被執行緒Thread
執行;此外,最重要的一點就是FutureTask
還間接實現了Future
介面,因此還可以獲得任務執行的結果。下面我們就來簡單看看這幾個介面的相關api
。
// Runnable.java
@FunctionalInterface
public interface Runnable {
// 執行執行緒任務
public abstract void run();
}
Runnable
沒啥好說的,相信大家都已經很熟悉了。
// Future.java
public interface Future<V> {
/**
* 嘗試取消執行緒任務的執行,分為以下幾種情況:
* 1)如果執行緒任務已經完成或已經被取消或其他原因不能被取消,此時會失敗並返回false;
* 2)如果任務還未開始執行,此時執行cancel方法,那麼任務將被取消執行,此時返回true;TODO 此時對應任務狀態state的哪種狀態???不懂!!
* 3)如果任務已經開始執行,那麼mayInterruptIfRunning這個引數將決定是否取消任務的執行。
* 這裡值得注意的是,cancel(true)實質並不能真正取消執行緒任務的執行,而是發出一個執行緒
* 中斷的訊號,一般需要結合Thread.currentThread().isInterrupted()來使用。
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 判斷任務是否被取消,在執行任務完成前被取消,此時會返回true
*/
boolean isCancelled();
/**
* 這個方法不管任務正常停止,異常還是任務被取消,總是返回true。
*/
boolean isDone();
/**
* 獲取任務執行結果,注意是阻塞等待獲取任務執行結果。
*/
V get() throws InterruptedException, ExecutionException;
/**
* 獲取任務執行結果,注意是阻塞等待獲取任務執行結果。
* 只不過在規定的時間內未獲取到結果,此時會丟擲超時異常
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future
介面象徵著非同步執行任務的結果即執行一個耗時任務完全可以另起一個執行緒執行,然後此時我們可以去做其他事情,做完其他事情我們再呼叫Future.get()
方法獲取結果即可,此時若非同步任務還沒結束,此時會一直阻塞等待,直到非同步任務執行完獲取到結果。
// RunnableFuture.java
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
RunnableFuture
是Future
和Runnable
介面的組合,即這個介面表示又可以被執行緒非同步執行,因為實現了Runnable
介面,又可以獲得執行緒非同步任務的執行結果,因為實現了Future
介面。因此解決了Runnable
非同步任務沒有返回結果的缺陷。
接下來我們來看下FutureTask
,FutureTask
實現了RunnableFuture
介面,因此是Future
和Runnable
介面的具體實現類,是一個可被取消的非同步執行緒任務,提供了Future
的基本實現,即非同步任務執行後我們能夠獲取到非同步任務的執行結果,是我們接下來分析的重中之重。FutureTask
可以包裝一個Callable
和Runnable
物件,此外,FutureTask
除了可以被執行緒執行外,還可以被提交給執行緒池執行。
我們先看下FutureTask
類的api
,其中重點方法已經紅框框出。
上圖中FutureTask
的run
方法是被執行緒非同步執行的方法,get
方法即是取得非同步任務執行結果的方法,還有cancel
方法是取消任務執行的方法。接下來我們主要對這三個方法進行重點分析。
思考:
FutureTask
覆寫的run
方法的返回型別依然是void
,表示沒有返回值,那麼FutureTask
的get
方法又是如何獲得返回值的呢?FutureTask
的cancel
方法能真正取消執行緒非同步任務的執行麼?什麼情況下能取消?
因為FutureTask
非同步任務執行結果還跟Callable
介面有關,因此我們再來看下Callable
介面:
// Callable.java
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*/
V call() throws Exception;
}
我們都知道,Callable<V>
介面和Runnable
介面都可以被提交給執行緒池執行,唯一不同的就是Callable<V>
介面是有返回結果的,其中的泛型V
就是返回結果,而Runnable
介面是沒有返回結果的。
思考: 一般情況下,
Runnable
介面實現類才能被提交給執行緒池執行,為何Callable
介面實現類也可以被提交給執行緒池執行?想想執行緒池的submit
方法內部有對Callable
做適配麼?
4 FutureTask原始碼分析
4.1 FutureTask成員變數
我們首先來看下FutureTask
的成員變數有哪些,理解這些成員變數對後面的原始碼分析非常重要。
// FutureTask.java
/** 封裝的Callable物件,其call方法用來執行非同步任務 */
private Callable<V> callable;
/** 在FutureTask裡面定義一個成員變數outcome,用來裝非同步任務的執行結果 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 用來執行callable任務的執行緒 */
private volatile Thread runner;
/** 執行緒等待節點,reiber stack的一種實現 */
private volatile WaitNode waiters;
/** 任務執行狀態 */
private volatile int state;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
// 對應成員變數state的偏移地址
private static final long stateOffset;
// 對應成員變數runner的偏移地址
private static final long runnerOffset;
// 對應成員變數waiters的偏移地址
private static final long waitersOffset;
這裡我們要重點關注下FutureTask
的Callable
成員變數,因為FutureTask
的非同步任務最終是委託給Callable
去實現的。
思考:
FutureTask
的成員變數runner
,waiters
和state
都被volatile
修飾,我們可以思考下為什麼這三個成員變數需要被volatile
修飾,而其他成員變數又不用呢?volatile
關鍵字的作用又是什麼呢?- 既然已經定義了成員變數
runner
,waiters
和state
了,此時又定義了stateOffset
,runnerOffset
和waitersOffset
變數分別對應runner
,waiters
和state
的偏移地址,為何要多此一舉呢?
我們再來看看stateOffset
,runnerOffset
和waitersOffset
變數這三個變數的初始化過程:
// FutureTask.java
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
4.2 FutureTask的狀態變化
前面講了FutureTask
的成員變數,有一個表示狀態的成員變數state
我們要重點關注下,state
變量表示任務執行的狀態。
// FutureTask.java
/** 任務執行狀態 */
private volatile int state;
/** 任務新建狀態 */
private static final int NEW = 0;
/** 任務正在完成狀態,是一個瞬間過渡狀態 */
private static final int COMPLETING = 1;
/** 任務正常結束狀態 */
private static final int NORMAL = 2;
/** 任務執行異常狀態 */
private static final int EXCEPTIONAL = 3;
/** 任務被取消狀態,對應cancel(false) */
private static final int CANCELLED = 4;
/** 任務中斷狀態,是一個瞬間過渡狀態 */
private static final int INTERRUPTING = 5;
/** 任務被中斷狀態,對應cancel(true) */
private static final int INTERRUPTED = 6;
可以看到任務狀態變數state
有以上7種狀態,0-6分別對應著每一種狀態。任務狀態一開始是NEW
,然後由FutureTask
的三個方法set
,setException
和cancel
來設定狀態的變化,其中狀態變化有以下四種情況:
NEW -> COMPLETING -> NORMAL
:這個狀態變化表示非同步任務的正常結束,其中COMPLETING
是一個瞬間臨時的過渡狀態,由set
方法設定狀態的變化;NEW -> COMPLETING -> EXCEPTIONAL
:這個狀態變化表示非同步任務執行過程中丟擲異常,由setException
方法設定狀態的變化;NEW -> CANCELLED
:這個狀態變化表示被取消,即呼叫了cancel(false)
,由cancel
方法來設定狀態變化;NEW -> INTERRUPTING -> INTERRUPTED
:這個狀態變化表示被中斷,即呼叫了cancel(true)
,由cancel
方法來設定狀態變化。
4.3 FutureTask建構函式
FutureTask
有兩個建構函式,我們分別來看看:
// FutureTask.java
// 第一個建構函式
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
可以看到,這個建構函式在我們前面舉的“打火鍋”的例子程式碼中有用到,就是Callable
成員變數賦值,在非同步執行任務時再呼叫Callable.call
方法執行非同步任務邏輯。此外,此時給任務狀態state
賦值為NEW
,表示任務新建狀態。
我們再來看下FutureTask
的另外一個建構函式:
// FutureTask.java
// 另一個建構函式
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
這個建構函式在執行Executors.callable(runnable, result)
時是通過介面卡RunnableAdapter
來將Runnable
物件runnable
轉換成Callable
物件,然後再分別給callable
和state
變數賦值。
注意,這裡我們需要記住的是FutureTask
新建時,此時的任務狀態state
是NEW
就好了。
4.4 FutureTask.run方法,用來執行非同步任務
前面我們有講到FutureTask
間接實現了Runnable
介面,覆寫了Runnable
介面的run
方法,因此該覆寫的run
方法是提交給執行緒來執行的,同時,該run
方法正是執行非同步任務邏輯的方法,那麼,執行完run
方法又是如何儲存非同步任務執行的結果的呢?
我們現在著重來分析下run
方法:
// FutureTask.java
public void run() {
// 【1】,為了防止多執行緒併發執行非同步任務,這裡需要判斷執行緒滿不滿足執行非同步任務的條件,有以下三種情況:
// 1)若任務狀態state為NEW且runner為null,說明還未有執行緒執行過非同步任務,此時滿足執行非同步任務的條件,
// 此時同時呼叫CAS方法為成員變數runner設定當前執行緒的值;
// 2)若任務狀態state為NEW且runner不為null,任務狀態雖為NEW但runner不為null,說明有執行緒正在執行非同步任務,
// 此時不滿足執行非同步任務的條件,直接返回;
// 1)若任務狀態state不為NEW,此時不管runner是否為null,說明已經有執行緒執行過非同步任務,此時沒必要再重新
// 執行一次非同步任務,此時不滿足執行非同步任務的條件;
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
// 拿到之前建構函式傳進來的callable實現類物件,其call方法封裝了非同步任務執行的邏輯
Callable<V> c = callable;
// 若任務還是新建狀態的話,那麼就呼叫非同步任務
if (c != null && state == NEW) {
// 非同步任務執行結果
V result;
// 非同步任務執行成功還是始遍標誌
boolean ran;
try {
// 【2】,執行非同步任務邏輯,並把執行結果賦值給result
result = c.call();
// 若非同步任務執行過程中沒有丟擲異常,說明非同步任務執行成功,此時設定ran標誌為true
ran = true;
} catch (Throwable ex) {
result = null;
// 非同步任務執行過程丟擲異常,此時設定ran標誌為false
ran = false;
// 【3】設定異常,裡面也設定state狀態的變化
setException(ex);
}
// 【3】若非同步任務執行成功,此時設定非同步任務執行結果,同時也設定狀態的變化
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 非同步任務正在執行過程中,runner一直是非空的,防止併發呼叫run方法,前面有呼叫cas方法做判斷的
// 在非同步任務執行完後,不管是正常結束還是異常結束,此時設定runner為null
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
// 執行緒執行非同步任務後的任務狀態
int s = state;
// 【4】如果執行了cancel(true)方法,此時滿足條件,
// 此時呼叫handlePossibleCancellationInterrupt方法處理中斷
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
可以看到執行非同步任務的run
方法主要分為以下四步來執行:
- 判斷執行緒是否滿足執行非同步任務的條件:為了防止多執行緒併發執行非同步任務,這裡需要判斷執行緒滿不滿足執行非同步任務的條件;
- 若滿足條件,執行非同步任務:因為非同步任務邏輯封裝在
Callable.call
方法中,此時直接呼叫Callable.call
方法執行非同步任務,然後返回執行結果; - 根據非同步任務的執行情況做不同的處理:1) 若非同步任務執行正常結束,此時呼叫
set(result);
來設定任務執行結果;2)若非同步任務執行丟擲異常,此時呼叫setException(ex);
來設定異常,詳細分析請見4.4.1小節
; - 非同步任務執行完後的善後處理工作:不管非同步任務執行成功還是失敗,若其他執行緒有呼叫
FutureTask.cancel(true)
,此時需要呼叫handlePossibleCancellationInterrupt
方法處理中斷,詳細分析請見4.4.2小節
。
這裡值得注意的是判斷執行緒滿不滿足執行非同步任務條件時,runner
是否為null
是呼叫UNSAFE
的CAS
方法compareAndSwapObject
來判斷和設定的,同時compareAndSwapObject
是通過成員變數runner
的偏移地址runnerOffset
來給runner
賦值的,此外,成員變數runner
被修飾為volatile
是在多執行緒的情況下, 一個執行緒的volatile
修飾變數的設值能夠立即刷進主存,因此值便可被其他執行緒可見。
4.4.1 FutureTask的set和setException方法
下面我們來看下當非同步任務執行正常結束時,此時會呼叫set(result);
方法:
// FutureTask.java
protected void set(V v) {
// 【1】呼叫UNSAFE的CAS方法判斷任務當前狀態是否為NEW,若為NEW,則設定任務狀態為COMPLETING
// 【思考】此時任務不能被多執行緒併發執行,什麼情況下會導致任務狀態不為NEW?
// 答案是隻有在呼叫了cancel方法的時候,此時任務狀態不為NEW,此時什麼都不需要做,
// 因此需要呼叫CAS方法來做判斷任務狀態是否為NEW
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 【2】將任務執行結果賦值給成員變數outcome
outcome = v;
// 【3】將任務狀態設定為NORMAL,表示任務正常結束
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 【4】呼叫任務執行完成方法,此時會喚醒阻塞的執行緒,呼叫done()方法和清空等待執行緒連結串列等
finishCompletion();
}
}
可以看到當非同步任務正常執行結束後,且非同步任務沒有被cancel
的情況下,此時會做以下事情:將任務執行結果儲存到FutureTask
的成員變數outcome
中的,賦值結束後會呼叫finishCompletion
方法來喚醒阻塞的執行緒(哪裡來的阻塞執行緒?後面會分析),值得注意的是這裡對應的任務狀態變化是NEW -> COMPLETING -> NORMAL。
我們繼續來看下當非同步任務執行過程中丟擲異常,此時會呼叫setException(ex);
方法。
// FutureTask.java
protected void setException(Throwable t) {
// 【1】呼叫UNSAFE的CAS方法判斷任務當前狀態是否為NEW,若為NEW,則設定任務狀態為COMPLETING
// 【思考】此時任務不能被多執行緒併發執行,什麼情況下會導致任務狀態不為NEW?
// 答案是隻有在呼叫了cancel方法的時候,此時任務狀態不為NEW,此時什麼都不需要做,
// 因此需要呼叫CAS方法來做判斷任務狀態是否為NEW
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 【2】將異常賦值給成員變數outcome
outcome = t;
// 【3】將任務狀態設定為EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// 【4】呼叫任務執行完成方法,此時會喚醒阻塞的執行緒,呼叫done()方法和清空等待執行緒連結串列等
finishCompletion();
}
}
可以看到setException(Throwable t)
的程式碼邏輯跟前面的set(V v)
幾乎一樣,不同的是任務執行過程中丟擲異常,此時是將異常儲存到FutureTask
的成員變數outcome
中,還有,值得注意的是這裡對應的任務狀態變化是NEW -> COMPLETING -> EXCEPTIONAL。
因為非同步任務不管正常還是異常結束,此時都會呼叫FutureTask
的finishCompletion
方法來喚醒喚醒阻塞的執行緒,這裡阻塞的執行緒是指我們呼叫Future.get
方法時若非同步任務還未執行完,此時該執行緒會阻塞。
// FutureTask.java
private void finishCompletion() {
// assert state > COMPLETING;
// 取出等待執行緒連結串列頭節點,判斷頭節點是否為null
// 1)若執行緒連結串列頭節點不為空,此時以“後進先出”的順序(棧)移除等待的執行緒WaitNode節點
// 2)若執行緒連結串列頭節點為空,說明還沒有執行緒呼叫Future.get()方法來獲取任務執行結果,固然不用移除
for (WaitNode q; (q = waiters) != null;) {
// 呼叫UNSAFE的CAS方法將成員變數waiters設定為空
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
// 取出WaitNode節點的執行緒
Thread t = q.thread;
// 若取出的執行緒不為null,則將該WaitNode節點執行緒置空,且喚醒正在阻塞的該執行緒
if (t != null) {
q.thread = null;
//【重要】喚醒正在阻塞的該執行緒
LockSupport.unpark(t);
}
// 繼續取得下一個WaitNode執行緒節點
WaitNode next = q.next;
// 若沒有下一個WaitNode執行緒節點,說明已經將所有等待的執行緒喚醒,此時跳出for迴圈
if (next == null)
break;
// 將已經移除的執行緒WaitNode節點的next指標置空,此時好被垃圾回收
q.next = null; // unlink to help gc
// 再把下一個WaitNode執行緒節點置為當前執行緒WaitNode頭節點
q = next;
}
break;
}
}
// 不管任務正常執行還是丟擲異常,都會呼叫done方法
done();
// 因為非同步任務已經執行完且結果已經儲存到outcome中,因此此時可以將callable物件置空了
callable = null; // to reduce footprint
}
finishCompletion
方法的作用就是不管非同步任務正常還是異常結束,此時都要喚醒且移除執行緒等待連結串列的等待執行緒節點,這個連結串列實現的是一個是Treiber stack
,因此喚醒(移除)的順序是"後進先出"即後面先來的執行緒先被先喚醒(移除),關於這個執行緒等待連結串列是如何成鏈的,後面再繼續分析。
4.4.2 FutureTask的handlePossibleCancellationInterrupt方法
在4.4小節
分析的run
方法裡的最後有一個finally
塊,此時若任務狀態state >= INTERRUPTING
,此時說明有其他執行緒執行了cancel(true)
方法,此時需要讓出CPU
執行的時間片段給其他執行緒執行,我們來看下具體的原始碼:
// FutureTask.java
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
// 當任務狀態是INTERRUPTING時,此時讓出CPU執行的機會,讓其他執行緒執行
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
思考: 為啥任務狀態是
INTERRUPTING
時,此時就要讓出CPU執行的時間片段呢?還有為什麼要在義務任務執行後才呼叫handlePossibleCancellationInterrupt
方法呢?
4.5 FutureTask.get方法,獲取任務執行結果
前面我們起一個執行緒在其`run`方法中執行非同步任務後,此時我們可以呼叫`FutureTask.get`方法來獲取非同步任務執行的結果。
// FutureTask.java
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 【1】若任務狀態<=COMPLETING,說明任務正在執行過程中,此時可能正常結束,也可能遇到異常
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 【2】最後根據任務狀態來返回任務執行結果,此時有三種情況:1)任務正常執行;2)任務執行異常;3)任務被取消
return report(s);
}
可以看到,如果任務狀態state<=COMPLETING
,說明非同步任務正在執行過程中,此時會呼叫awaitDone
方法阻塞等待;當任務執行完後,此時再呼叫report
方法來報告任務結果,此時有三種情況:1)任務正常執行;2)任務執行異常;3)任務被取消。
4.5.1 FutureTask.awaitDone方法
FutureTask.awaitDone
方法會阻塞獲取非同步任務執行結果的當前執行緒,直到非同步任務執行完成。
// FutureTask.java
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 計算超時結束時間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 執行緒連結串列頭節點
WaitNode q = null;
// 是否入隊
boolean queued = false;
// 死迴圈
for (;;) {
// 如果當前獲取任務執行結果的執行緒被中斷,此時移除該執行緒WaitNode連結串列節點,並丟擲InterruptedException
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 【5】如果任務狀態>COMPLETING,此時返回任務執行結果,其中此時任務可能正常結束(NORMAL),可能丟擲異常(EXCEPTIONAL)
// 或任務被取消(CANCELLED,INTERRUPTING或INTERRUPTED狀態的一種)
if (s > COMPLETING) {
// 【問】此時將當前WaitNode節點的執行緒置空,其中在任務結束時也會呼叫finishCompletion將WaitNode節點的thread置空,
// 這裡為什麼又要再呼叫一次q.thread = null;呢?
// 【答】因為若很多執行緒來獲取任務執行結果,在任務執行完的那一刻,此時獲取任務的執行緒要麼已經線上程等待連結串列中,要麼
// 此時還是一個孤立的WaitNode節點。線上程等待連結串列中的的所有WaitNode節點將由finishCompletion來移除(同時喚醒)所有
// 等待的WaitNode節點,以便垃圾回收;而孤立的執行緒WaitNode節點此時還未阻塞,因此不需要被喚醒,此時只要把其屬性置為
// null,然後其有沒有被誰引用,因此可以被GC。
if (q != null)
q.thread = null;
// 【重要】返回任務執行結果
return s;
}
// 【4】若任務狀態為COMPLETING,此時說明任務正在執行過程中,此時獲取任務結果的執行緒需讓出CPU執行時間片段
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 【1】若當前執行緒還沒有進入執行緒等待連結串列的WaitNode節點,此時新建一個WaitNode節點,並把當前執行緒賦值給WaitNode節點的thread屬性
else if (q == null)
q = new WaitNode();
// 【2】若當前執行緒等待節點還未入執行緒等待佇列,此時加入到該執行緒等待佇列的頭部
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 若有超時設定,那麼處理超時獲取任務結果的邏輯
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
// 【3】若沒有超時設定,此時直接阻塞當前執行緒
else
LockSupport.park(this);
}
}
FutureTask.awaitDone
方法主要做的事情總結如下:
- 首先
awaitDone
方法裡面是一個死迴圈; - 若獲取結果的當前執行緒被其他執行緒中斷,此時移除該執行緒WaitNode連結串列節點,並丟擲InterruptedException;
- 如果任務狀態
state>COMPLETING
,此時返回任務執行結果; - 若任務狀態為
COMPLETING
,此時獲取任務結果的執行緒需讓出CPU執行時間片段; - 若
q == null
,說明當前執行緒還未設定到WaitNode
節點,此時新建WaitNode
節點並設定其thread
屬性為當前執行緒; - 若
queued==false
,說明當前執行緒WaitNode
節點還未加入執行緒等待連結串列,此時加入該連結串列的頭部; - 當
timed
設定為true時,此時該方法具有超時功能,關於超時的邏輯這裡不詳細分析; - 當前面6個條件都不滿足時,此時阻塞當前執行緒。
我們分析到這裡,可以直到執行非同步任務只能有一個執行緒來執行,而獲取非同步任務結果可以多執行緒來獲取,當非同步任務還未執行完時,此時獲取非同步任務結果的執行緒會加入執行緒等待連結串列中,然後呼叫呼叫LockSupport.park(this);
方法阻塞當前執行緒。直到非同步任務執行完成,此時會呼叫finishCompletion
方法來喚醒並移除執行緒等待連結串列的每個WaitNode
節點,這裡這裡喚醒(移除)WaitNode
節點的執行緒是從連結串列頭部開始的,前面我們也已經分析過。
還有一個特別需要注意的就是awaitDone
方法裡面是一個死迴圈,當一個獲取非同步任務的執行緒進來後可能會多次進入多個條件分支執行不同的業務邏輯,也可能只進入一個條件分支。下面分別舉兩種可能的情況進行說明:
情況1:
當獲取非同步任務結果的執行緒進來時,此時非同步任務還未執行完即state=NEW
且沒有超時設定時:
- 第一次迴圈:此時
q = null
,此時進入上面程式碼標號【1】
的判斷分支,即為當前執行緒新建一個WaitNode
節點; - 第二次迴圈:此時
queued = false
,此時進入上面程式碼標號【2】
的判斷分支,即將之前新建的WaitNode
節點加入執行緒等待連結串列中; - 第三次迴圈:此時進入上面程式碼標號
【3】
的判斷分支,即阻塞當前執行緒; - 第四次迴圈:加入此時非同步任務已經執行完,此時進入上面程式碼標號
【5】
的判斷分支,即返回非同步任務執行結果。
情況2:
當獲取非同步任務結果的執行緒進來時,此時非同步任務已經執行完即state>COMPLETING
且沒有超時設定時,此時直接進入上面程式碼標號【5】
的判斷分支,即直接返回非同步任務執行結果即可,也不用加入執行緒等待連結串列了。
4.5.2 FutureTask.report方法
在get
方法中,當非同步任務執行結束後即不管非同步任務正常還是異常結束,亦或是被cancel
,此時獲取非同步任務結果的執行緒都會被喚醒,因此會繼續執行FutureTask.report
方法報告非同步任務的執行情況,此時可能會返回結果,也可能會丟擲異常。
// FutureTask.java
private V report(int s) throws ExecutionException {
// 將非同步任務執行結果賦值給x,此時FutureTask的成員變數outcome要麼儲存著
// 非同步任務正常執行的結果,要麼儲存著非同步任務執行過程中丟擲的異常
Object x = outcome;
// 【1】若非同步任務正常執行結束,此時返回非同步任務執行結果即可
if (s == NORMAL)
return (V)x;
// 【2】若非同步任務執行過程中,其他執行緒執行過cancel方法,此時丟擲CancellationException異常
if (s >= CANCELLED)
throw new CancellationException();
// 【3】若非同步任務執行過程中,丟擲異常,此時將該異常轉換成ExecutionException後,重新丟擲。
throw new ExecutionException((Throwable)x);
}
4.6 FutureTask.cancel方法,取消執行任務
我們最後再來看下FutureTask.cancel
方法,我們一看到FutureTask.cancel
方法,肯定一開始就天真的認為這是一個可以取消非同步任務執行的方法,如果我們這樣認為的話,只能說我們猜對了一半。
// FutureTask.java
public boolean cancel(boolean mayInterruptIfRunning) {
// 【1】判斷當前任務狀態,若state == NEW時根據mayInterruptIfRunning引數值給當前任務狀態賦值為INTERRUPTING或CANCELLED
// a)當任務狀態不為NEW時,說明非同步任務已經完成,或丟擲異常,或已經被取消,此時直接返回false。
// TODO 【問題】此時若state = COMPLETING呢?此時為何也直接返回false,而不能發出中斷非同步任務執行緒的中斷訊號呢??
// TODO 僅僅因為COMPLETING是一個瞬時態嗎???
// b)當前僅當任務狀態為NEW時,此時若mayInterruptIfRunning為true,此時任務狀態賦值為INTERRUPTING;否則賦值為CANCELLED。
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 【2】如果mayInterruptIfRunning為true,此時中斷執行非同步任務的執行緒runner(還記得執行非同步任務時就把執行非同步任務的執行緒就賦值給了runner成員變數嗎)
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
// 中斷執行非同步任務的執行緒runner
t.interrupt();
} finally { // final state
// 最後任務狀態賦值為INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
// 【3】不管mayInterruptIfRunning為true還是false,此時都要呼叫finishCompletion方法喚醒阻塞的獲取非同步任務結果的執行緒並移除執行緒等待連結串列節點
} finally {
finishCompletion();
}
// 返回true
return true;
}
以上程式碼中,當非同步任務狀態state != NEW
時,說明非同步任務已經正常執行完或已經異常結束亦或已經被cancel
,此時直接返回false
;當非同步任務狀態state = NEW
時,此時又根據mayInterruptIfRunning
引數是否為true
分為以下兩種情況:
- 當
mayInterruptIfRunning = false
時,此時任務狀態state
直接被賦值為CANCELLED
,此時不會對執行非同步任務的執行緒發出中斷訊號,值得注意的是這裡對應的任務狀態變化是NEW -> CANCELLED。 - 當
mayInterruptIfRunning = true
時,此時會對執行非同步任務的執行緒發出中斷訊號,值得注意的是這裡對應的任務狀態變化是NEW -> INTERRUPTING -> INTERRUPTED。
最後不管mayInterruptIfRunning
為true
還是false
,此時都要呼叫finishCompletion
方法喚醒阻塞的獲取非同步任務結果的執行緒並移除執行緒等待連結串列節點。
從FutureTask.cancel
原始碼中我們可以得出答案,該方法並不能真正中斷正在執行非同步任務的執行緒,只能對執行非同步任務的執行緒發出中斷訊號。如果執行非同步任務的執行緒處於sleep
、wait
或join
的狀態中,此時會丟擲InterruptedException
異常,該執行緒可以被中斷;此外,如果非同步任務需要在while
迴圈執行的話,此時可以結合以下程式碼來結束非同步任務執行緒,即執行非同步任務的執行緒被中斷時,此時Thread.currentThread().isInterrupted()
返回true
,不滿足while
迴圈條件因此退出迴圈,結束非同步任務執行執行緒,如下程式碼:
public Integer call() throws Exception {
while (!Thread.currentThread().isInterrupted()) {
// 業務邏輯程式碼
System.out.println("running...");
}
return 666;
}
注意:呼叫了FutureTask.cancel
方法,只要返回結果是true
,假如非同步任務執行緒雖然不能被中斷,即使非同步任務執行緒正常執行完畢,返回了執行結果,此時呼叫FutureTask.get
方法也不能夠獲取非同步任務執行結果,此時會丟擲CancellationException
異常。請問知道這是為什麼嗎?
因為呼叫了FutureTask.cancel
方法,只要返回結果是true
,此時的任務狀態為CANCELLED
或INTERRUPTED
,同時必然會執行finishCompletion
方法,而finishCompletion
方法會喚醒獲取非同步任務結果的執行緒等待列表的執行緒,而獲取非同步任務結果的執行緒喚醒後發現狀態s >= CANCELLED
,此時就會丟擲CancellationException
異常了。
5 總結
好了,本篇文章對FutureTask
的原始碼分析就到此結束了,下面我們再總結下FutureTask
的實現邏輯:
- 我們實現
Callable
介面,在覆寫的call
方法中定義需要執行的業務邏輯; - 然後把我們實現的
Callable
介面實現物件傳給FutureTask
,然後FutureTask
作為非同步任務提交給執行緒執行; - 最重要的是
FutureTask
內部維護了一個狀態state
,任何操作(非同步任務正常結束與否還是被取消)都是圍繞著這個狀態進行,並隨時更新state
任務的狀態; - 只能有一個執行緒執行非同步任務,當非同步任務執行結束後,此時可能正常結束,異常結束或被取消。
- 可以多個執行緒併發獲取非同步任務執行結果,當非同步任務還未執行完,此時獲取非同步任務的執行緒將加入執行緒等待列表進行等待;
- 當非同步任務執行緒執行結束後,此時會喚醒獲取非同步任務執行結果的執行緒,注意喚醒順序是"後進先出"即後面加入的阻塞執行緒先被喚醒。
- 當我們呼叫
FutureTask.cancel
方法時並不能真正停止執行非同步任務的執行緒,只是發出中斷執行緒的訊號。但是隻要cancel
方法返回true
,此時即使非同步任務能正常執行完,此時我們呼叫get
方法獲取結果時依然會丟擲CancellationException
異常。
擴充套件: 前面我們提到了
FutureTask
的runner
,waiters
和state
都是用volatile
關鍵字修飾,說明這三個變數都是多執行緒共享的物件(成員變數),會被多執行緒操作,此時用volatile
關鍵字修飾是為了一個執行緒操作volatile
屬性變數值後,能夠及時對其他執行緒可見。此時多執行緒操作成員變數僅僅用了volatile
關鍵字仍然會有執行緒安全問題的,而此時Doug Lea老爺子沒有引入任何執行緒鎖,而是採用了Unsafe
的CAS
方法來代替鎖操作,確保執行緒安全性。
6 分析FutureTask原始碼,我們能學到什麼?
我們分析原始碼的目的是什麼?除了弄懂FutureTask
的內部實現原理外,我們還要借鑑大佬寫寫框架原始碼的各種技巧,只有這樣,我們才能成長。
分析了FutureTask
原始碼,我們可以從中學到:
- 利用
LockSupport
來實現執行緒的阻塞\喚醒機制; - 利用
volatile
和UNSAFE
的CAS
方法來實現執行緒共享變數的無鎖化操作; - 若要編寫超時異常的邏輯可以參考
FutureTask
的get(long timeout, TimeUnit unit)
的實現邏輯; - 多執行緒獲取某一成員變數結果時若需要等待時的執行緒等待連結串列的邏輯實現;
- 某一非同步任務在某一時刻只能由單一執行緒執行的邏輯實現;
FutureTask
中的任務狀態satate
的變化處理的邏輯實現。- ...
以上列舉的幾點都是我們可以學習參考的地方。
若您覺得不錯,請無情的轉發和點贊吧!
【原始碼筆記】Github地址:
https://github.com/yuanmabiji/Java-SourceCode-Blogs
公眾號【原始碼筆記】,專注於Java後端系列框架的原始碼分析。