【死磕 Java 併發】—– J.U.C 之 AQS:同步狀態的獲取與釋放
摘要: 原創出處 http://cmsblogs.com/?p=2197 「小明哥」歡迎轉載,保留摘要,謝謝!
在前面提到過,AQS 是構建 Java 同步元件的基礎,我們期待它能夠成為實現大部分同步需求的基礎。
AQS 的設計模式採用的模板方法模式,子類通過繼承的方式,實現它的抽象方法來管理同步狀態。對於子類而言,它並沒有太多的活要做,AQS 已經提供了大量的模板方法來實現同步,主要是分為三類:
-
獨佔式獲取和釋放同步狀態
-
共享式獲取和釋放同步狀態
-
查詢同步佇列中的等待執行緒情況。
自定義子類使用 AQS 提供的模板方法,就可以實現自己的同步語義。
1. 獨佔式
獨佔式,同一時刻,僅有一個執行緒持有同步狀態
1.1 獨佔式同步狀態獲取
#acquire(int arg)
方法,為 AQS 提供的模板方法。該方法為獨佔式獲取同步狀態,但是該方法對中斷不敏感。也就是說,由於執行緒獲取同步狀態失敗而加入到 CLH 同步佇列中,後續對該執行緒進行中斷操作時,執行緒不會從 CLH 同步佇列中移除。程式碼如下
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
第 2 行:呼叫 #tryAcquire(int arg)
方法,去嘗試獲取同步狀態,獲取成功則設定鎖狀態並返回 true ,否則獲取失敗,返回 false 。若獲取成功,#acquire(int arg)
方法,直接返回,不用執行緒阻塞,自旋直到獲得同步狀態成功。
#tryAcquire(int arg)
方法,需要自定義同步元件自己實現,該方法必須要保證執行緒安全的獲取同步狀態。程式碼如下:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
直接丟擲 UnsupportedOperationException 異常。
第 3 行:如果 #tryAcquire(int arg)
方法返回 false ,即獲取同步狀態失敗,則呼叫 #addWaiter(Node mode)
方法,將當前執行緒加入到 CLH 同步佇列尾部。並且, mode
方法引數為 Node.EXCLUSIVE
,表示獨佔模式。
第 3 行:呼叫 boolean #acquireQueued(Node node, int arg)
方法,自旋直到獲得同步狀態成功。詳細解析,見 「1.1.1 acquireQueued」 中。另外,該方法的返回值型別為 boolean
,當返回 true 時,表示在這個過程中,發生過執行緒中斷。但是呢,這個方法又會清理執行緒中斷的標識,所以在種情況下,需要呼叫【第 4 行】的 #selfInterrupt() 方法,恢復執行緒中斷的標識,程式碼如下:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
1.1.1 acquireQueued
boolean #acquireQueued(Node node, int arg)
方法,為一個自旋的過程,也就是說,當前執行緒(Node)進入同步佇列後,就會進入一個自旋的過程,每個節點都會自省地觀察,當條件滿足,獲取到同步狀態後,就可以從這個自旋過程中退出,否則會一直執行下去。
流程圖如下:
程式碼如下:
final boolean acquireQueued(final Node node, int arg) {
// 記錄是否獲取同步狀態成功
boolean failed = true;
try {
// 記錄過程中,是否發生執行緒中斷
boolean interrupted = false;
/*
* 自旋過程,其實就是一個死迴圈而已
*/
for (;;) {
// 當前執行緒的前驅節點
final Node p = node.predecessor();
// 當前執行緒的前驅節點是頭結點,且同步狀態成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 獲取失敗,執行緒等待--具體後面介紹
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 獲取同步狀態發生異常,取消獲取。
if (failed)
cancelAcquire(node);
}
}
1.1.2 shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲得前一個節點的等待狀態
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 第 4 至 9 行:等待狀態為
Node.SIGNAL
時,表示pred
的下一個節點node
的執行緒需要阻塞等待。在pred
的執行緒釋放同步狀態時,會對node
的執行緒進行喚醒通知。所以,【第 9 行】返回 true ,表明當前執行緒可以被 park,安全的阻塞等待。