Java AQS源碼閱讀
AQS源碼詳解
源碼分析維度:同步隊列、獨占式同步狀態獲取與釋放、共享式同步狀態獲取與釋放以及超時獲取同步狀態等同步器的核心數據結構與模板方法。
同步隊列介紹
同步器依賴內部的同步隊列(一個FIFO雙向隊列)來完成同步狀態的管理,當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待狀態等信息構建成為一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點中的線程喚醒,使其再次嘗試獲取同步狀態。
同步隊列中的節點(Node)是用來保存獲取同步狀態失敗的線程引用、等待狀態以及前驅和後繼節點,節點的屬性類型與名稱以及描述見下圖:
同步隊列的基本結構如下圖所示:
同步器包含了兩個節點類型的引用,一個指向頭節點,而另一個指向尾節點。當一個線程成功地獲取了同步狀態(或者鎖),其他線程將無法獲取到同步狀態,轉而被構造成為節點並加入到同步隊列中,而這個加入隊列的過程必須要保證線程安全,因此同步器提供了一個基於CAS的設置尾節點的方法:compareAndSetTail(Node expect , Node update)
同步器將節點加入到同步隊列的過程如圖5-2所示:
同步隊列遵循FIFO,首節點是獲取同步狀態成功的節點,首節點的線程在釋放同步狀態時,將會喚醒後繼節點,而後續節點將會在獲取同步狀態成功時將自己設置為首節點,該過程如下圖:
設置首節點是通過獲取同步狀態成功的線程來完成的,由於只有一個線程能夠成功獲取到同步狀態,因此設置頭節點的方法並不需要使用CAS來保證,它只需要將首節點設置成為原首節點的後繼節點並斷開原首節點的next引用即可。
源碼閱讀
隊列構建:
private Node addWaiter (Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq (node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { //初始化階段
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
首先,我們從addWaiter
方法開始看起,此方法是通過自旋的形式來實現無鎖情況下並發處理問題。假設初始化時,有多個線程進入此方法,此時head和tail都尚未初始化,都為null,則第一個線程一定會進入到enq(node)
方法裏。在進入到enq(node)
方法後,如果此時head和tail節點依舊尚未初始化,則會進入初始化head結點和tail節點的階段,即t == null
的if塊裏。此代碼段是通過CAS的形式將head的值賦予new Node()
返回的引用地址,再執行tail=head
,則此時同步隊列的情況如下圖:
第一個線程進入到enq(final Node node)
方法的線程執行完初始化操作後,後續的線程或者第一個線程的後續循環操作都會進入到enq
方法的 else
代碼段裏。
- 執行完
Node t = tail; node.prev = t
FIFO隊列的情況如下圖
- 執行完
compareAndSetTail(t, node)
後 FIFO隊列的情況如下圖:
- 執行完
t.next = node;
後 FIFO隊列的情況如下圖:
- 如果是還有另一個線程進入到
enq(final Node node)
方法中時 FIFO隊列的情況如下圖:
同步隊列中 節點喚醒邏輯:
//1. 獨占式
public final boolean release(int arg) {
if (tryRelease(arg)) { //tryRelease(arg) 調用子類的實現,讓子類決定是否釋放鎖
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//2. 共享式
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //調用子類的實現,讓子類決定是否釋放鎖
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 同步隊列至少有兩個節點(頭節點和業務節點)
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 如果當前頭節點狀態為SIGNAL(暗示後續節點需要喚醒)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); // 喚醒頭結點後第一個需要喚醒的節點
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
// 喚醒當前節點(Node node)的後繼節點
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
/**
* 1. 如果當前節點還處於有效的等待狀態,則將其狀態還原為初始化狀態。
* 2. 為啥要還原? 你都等到了,還等呀?肯定是從等待狀態還原回去!
* 3. 這裏不care返回值,說明是允許失敗的。
* 4. 此段邏輯只有在cancelAcquire方法調用時才用到,其他入口傳入的都是head節點,
* head節點的狀態到這裏都是0了,不信可以看看方法調用方。
*/
compareAndSetWaitStatus(node, ws, 0);
/**
* 1. 此塊邏輯的目的:找到第一個還處於有效等待狀態的節點,然後將它喚醒。
* 2. 先取頭結點的下一個節點,如果該節點為null或者已經取消,則從尾節點開始向前遍歷找到需要喚
* 醒的節點。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 調用native的方法 執行喚醒線程的操作。
}
在AQS類中,tryRelease(int arg)
和tryReleaseShared((int arg))
方法體中,都只有一行代碼:throw new UnsupportedOperationException();
,從這裏,我們可以看出AQS其實是把何時釋放鎖的決定權交給了AQS具體的實現類來決定了,AQS在實現同步管理方面,更多的是扮演了工具類的角色。由於此處,我們關註的是同步隊列爭搶鎖的操作,故對於其他具體細節暫時不關註。
從上面的代碼中,我們可以看到在獨占式和共享式兩種情況下,釋放鎖之後都調用unparkSuccessor(h);
方法都是傳入的head節點,而unparkSuccessor
方法的作用是釋放當前節點的第一個有效後繼節點,從這個方面也可以看出同步隊列遵循了”先進先出“的原則。
同步隊列的釋放過程
// 以此舉例說明
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 同步隊列 出隊列和喚醒邏輯
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
同步隊列的出隊列操作入口其實並不止doAcquireSharedInterruptibly
這一種,此處只以它舉例,其他類似。當線程在獲取共享式的同步狀態失敗時,會進入到doAcquireSharedInterruptibly
方法內,當前獲取同步狀態失敗的線程會被封裝成一個node節點,然後檢測自己的prev
指向的是不是頭節點,如果是頭結點,則會調用setHeadAndPropagate
方法,否則會進入到shouldParkAfterFailedAcquire
和parkAndCheckInterrupt
兩個方法內。
shouldParkAfterFailedAcquire
方法首先會判斷當前節點的前置節點是否是有效的等待狀態;如果waitStatus
> 0,即處於已取消狀態,則需要將該前置節點移除隊列。由於是do while
該方法會一直往前找,直到找到最近一個正常等待的節點,並排在它後面。
為什麽一定要找個處於正常等待狀態的節點才排在它後面?
因為從上面分析的unparkSuccessor
方法,我們可以知道隊列釋放喚醒操作,是從head節點開始,釋放的永遠當前節點的後繼節點,這樣如果前置節點處於取消狀態,後繼節點永遠不能被喚醒!!! 跟對老大很重要!!!
看到這裏有小夥伴可能會問:那要是跟完老大,老大才被KO掉咋辦??
Java AQS源碼閱讀