執行緒基礎知識09-FutureTask詳解
阿新 • • 發佈:2020-09-14
FutureTask詳解
作用:用於等待一個執行緒執行完畢後再執行另一個執行緒任務。一般用於Executors框架中,最常使用的是再ThreadPoolExecutor中,進行多執行緒任務;
注意:
-
JDK1.8不在使用AQS進行執行緒管理;
-
取而代之的是通過CAS進行狀態的切換,waiter執行緒節點由堆疊完成操作;
-
每次執行完或者有異常後,都會呼叫方法,重新喚醒後繼執行緒進行鎖競爭,從而進行重排序;
UML圖
FutureTask幾種狀態變更
幾種狀態
-
初始狀態是NEW;
-
任務執行中的狀態是COMPLETING;
-
呼叫cancel方法,會取消任務,呼叫cancel(true)方法會中斷任務執行;
幾種狀態的切換如下:
-
NEW -> COMPLETING -> NORMAL
-
NEW -> COMPLETING -> EXCEPTIONAL
-
NEW -> CANCELLED
-
NEW -> INTERRUPTING -> INTERRUPTED
狀態碼的值大小
private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
原始碼檢視
run 方法
-
如果當前執行緒佔有鎖並且是初始狀態,則進行任務執行,並返回結果;
-
如果出現錯誤,則更改狀態,並喚醒所有後繼執行緒;
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))//判斷狀態是否為NEW或者鎖佔有執行緒是否當前執行緒 return; try { Callable<V> c = callable;//獲取當前callbale任務 if (c != null && state == NEW) { V result; boolean ran; try { /** * 如果實現的是Runnable介面,要預設返回的result; */ result = c.call();//執行當前執行緒任務 ran = true;//任務執行成功 } catch (Throwable ex) {//任務執行失敗 result = null; ran = false; setException(ex); } if (ran)//如果任務執行成功返回執行結果 set(result); } } finally { /** * 將當前執行緒設定為空 * 如果當前執行緒是正在終端狀態,呼叫Thread.yeild()進行鎖釋放重競爭 */ runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } /** * 本質還是呼叫Runnable的run方法,返回預給定的返回值 */ static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run();//相當於還是呼叫執行緒的run方法進行執行,沒什麼差別 return result; } } /** * 返回錯誤資訊,更改錯誤狀態,喚醒所有後繼執行緒 */ protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
set方法
-
更改當前節點狀態,並設定返回值;
-
喚醒所有後繼節點執行緒
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//更改狀態為COMPLETING執行中
outcome = v;//增加返回值
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 更改狀態為NORMAL,執行完成
finishCompletion();
}
}
/**
* 作用:喚起所有後繼節點的執行緒
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {//有後繼節點的時候
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//後繼節點替換當前節點
for (;;) {
Thread t = q.thread;//獲取節點執行緒
if (t != null) {//執行緒不為空時
q.thread = null;
LockSupport.unpark(t);//喚醒執行緒
}
WaitNode next = q.next;//當前節點的後繼節點
if (next == null)
break;
q.next = null; // GC回收節點
q = next;//進行節點替換
}
break;
}
}
/**
* 這個方法是繼承類去實現,完成執行緒任務或者取消執行緒任務
*/
done();
callable = null;
}
get方法
-
作用:獲取當前執行緒返回的資料;
-
方法執行的過程:
-
如果是初始狀態NEW,則建立當前執行緒為新的節點,並關聯後繼節點。
-
阻塞節點直到節點執行緒任務完成,或者丟擲異常;
-
返回當前執行緒執行資料;
-
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)//如果是初始狀態時
s = awaitDone(false, 0L);//任務執行,並返回執行狀態
return report(s);//用於返回資料
}
/**
* 執行的過程時:
* 1.判斷當前節點的狀態如果是終端,則移除當前節點丟擲異常 ;
* 2.如果節點執行完成,則將執行緒任務回收,並返回任務執行狀態;
* 3.如果當前節點為空,則建立節點
* 4.如果當前節點沒有在佇列中,則將當前節點加入到佇列中
* 5.有超時限制的話,判斷是否超時,超時則移出,否則進行阻塞
* 6.非以上情況,則進行阻塞,直到被喚醒。
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;//是否設定超時時間
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {//如果執行緒已經終端,移除節點,並丟擲異常
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {//如果大於COMPLETING說明任務已經執行
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // 如果狀態時COMPLETING,讓出CPU時間,並重新嘗試獲得
Thread.yield();
else if (q == null)//如果為空,則建立新的節點
q = new WaitNode();
else if (!queued)//如果沒有後繼節點,當前節點通過unsafe設定當前節點的後繼節點
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {//設定了超時的化,超時後,將當前節點進行移除
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);//如果沒有超時,則阻塞當前節點固定的時間
}
else
LockSupport.park(this);//阻塞當前節點
}
}
/**
* 移除等待的節點
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
/**
* 作用:用於返回結果
*/
private V report(int s) throws ExecutionException {
Object x = outcome;//返回的值
if (s == NORMAL)//正常返回
return (V)x;
if (s >= CANCELLED)//返回異常
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
cancel方法
-
初始狀態,根據傳入的boolean值,如果不傳值,預設取消當前節點,喚起後繼執行緒
-
如果傳值true,表示中斷,中斷任務後,喚起後繼節點執行緒;
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))//
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {//表示中斷當前執行緒任務
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);//更改節點狀態
}
}
} finally {
finishCompletion();//喚醒後繼節點執行緒
}
return true;
}
總結
1.所以為什麼不用AQS而改用CAS?
其實註解已經給出了答案:
Revision notes: This differs from previous versions of this
class that relied on AbstractQueuedSynchronizer, mainly to
avoid surprising users about retaining interrupt status during
cancellation races.
我們看一下AQS中斷式獲取鎖的方法;
- 假設第一次獲取鎖,並沒有獲取到;
- 加到同步佇列的過程中中斷異常了;
- 獲取中斷狀態(會清空當前執行緒中斷標識),又丟擲中斷異常
- 如果再來獲取一次,又會重複一次。這種容易對開發人員造成困惑。所以改CAS方式了;
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//判斷如果中斷了
throw new InterruptedException();//再丟擲中斷異常
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//。。。。。。。。。。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();//丟擲中斷異常
}
} finally {
//。。。。。
}
}