深入java併發包原始碼(三)AQS獨佔方法原始碼分析
AQS 的實現原理
學完用 AQS 自定義一個鎖以後,我們可以來看一下剛剛使用過的方法的實現。
分析原始碼的時候會省略一些不重要的程式碼。
AQS 的實現是基於一個 FIFO 佇列的,每一個等待的執行緒被封裝成 Node
存放在等待佇列中,頭結點是空的,不儲存資訊,等待佇列中的節點都是阻塞的,並且在每次被喚醒後都會檢測自己的前一個節點是否為頭結點,如果是頭節點證明在這個執行緒之前沒有在等待的執行緒,就嘗試著去獲取共享資源。
AQS 的繼承關係
AQS 繼承了AbstractOwnableSynchronizer
,我們先分析一下這個父類。
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { protected AbstractOwnableSynchronizer() { } /** * 獨佔模式下的執行緒 */ private transient Thread exclusiveOwnerThread; /** * 設定執行緒,只是對執行緒的 set 方法 */ protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } /** * 設定執行緒,對執行緒的 get 方法 */ protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
父類非常簡單,持有一個獨佔模式下的執行緒,然後就只剩下對這個執行緒的 get 和 set 方法。
AQS的內部類
AQS 是用連結串列佇列來實現執行緒等待的,那麼佇列肯定要有節點,我們先從節點講起。
Node 類,每一個等待的執行緒都會被封裝成 Node 類
Node 的域
public class Node {
int waitStatus;
Node prev;
Node next;
Thread thread;
Node nextWaiter;
}
waitStatus:等待狀態
prev:前驅節點
next:後繼節點
thread:持有的執行緒
nextWaiter:condiction 佇列中的後繼節點
Node 的 status:
Node 的狀態有四種:
- CANCELLED,值為 1,表示當前的執行緒被取消,被打斷或者獲取超時了
- SIGNAL,值為 -1,表示當前節點的後繼節點包含的執行緒需要執行,也就是 unpark;
- CONDITION,值為 -2,表示當前節點在等待 condition,也就是在 condition 佇列中;
- PROPAGATE,值為 -3,表示當前場景下後續的 acquireShared 能夠得以執行;
取消狀態的值是唯一的正數,也是唯一當排隊排到它了也不要資源而是直接輪到下個執行緒來獲取資源的
AQS 中的方法原始碼分析
acquire
這個方法執行了:
- 使用我們實現的
tryAcquire
來嘗試獲得鎖,如果獲得了鎖則退出 - 如果沒有獲得鎖就將執行緒新增到等待佇列中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
看到上面的 tryAcquire 返回 false 後就會呼叫 addWaiter
新建節點加入等待佇列中。引數 EXCLUSIVE 是獨佔模式。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 拿到尾節點,如果尾節點是空則說明是第一個節點,就直接入隊就好了
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果尾節點不是空的,則需要特殊方法入隊
enq(node);
return node;
}
在 addWaiter
方法建立完節點後,呼叫 enq 方法,在迴圈中用 CAS 操作將新的節點入隊。
因為可能會有多個執行緒同時設定尾節點,所以需要放在迴圈中不斷的設定尾節點。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 檢視尾節點
if (t == null) { // Must initialize
// 尾節點為空則設定為剛剛建立的節點
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 尾節點
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在這裡,節點入隊就結束了。
那麼我們回來前面分析的方法,
public final void acquire(long arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
剛剛分析完了 addWaiter
方法,這個方法返回了剛剛建立並且加入的佇列。現在開始分析 acquireQueued
方法。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 在迴圈中去獲取鎖
for (;;) {
// 拿前一個節點
final Node p = node.predecessor();
// 如果前一個節點是頭結點則嘗試著去獲取鎖
if (p == head && tryAcquire(arg)) {
// 拿到鎖後把這個節點設為頭結點,這裡 setHead 會把除了 next 以外的資料清除
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 這個方法檢視在獲取鎖失敗以後是否中斷,如果否的話就呼叫
// parkAndCheckInterrupt 阻塞方法執行緒,等待被喚醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireInterruptibly
因為很像所以順便來看一下 acquireInterruptibly
所呼叫的方法:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 只有這一句有差別,獲取失敗了並且檢測到中斷位被設為 true 直接丟擲異常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireNanos
再來看一下有限時間的,當獲取超時以後會將節點 Node 的狀態設為 cancel,設定為取消的用處在後面的 release 方法中會有體現。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
總結一下過程
release
這個方法首先去呼叫了我們實現的 tryRelease,當結果返回成功的時候,拿到頭結點,呼叫 unparkSuccessor 方法來喚醒頭結點的下一個節點。
public final boolean release(long arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitSatus;
// 因為已經獲取過鎖,所以將狀態設設為 0。失敗也沒所謂,說明有其他的執行緒把它設為0了
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 一般來說頭結點的下一個節點是在等待著被喚醒的,但是如果是取消的或者意外的是空的,
* 則向後遍歷直到找到沒有被取消的節點
*
*/
Node s = node.next;
// 為空或者大於 0,只有 cancel 狀態是大於 0 的
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}