[Java併發] AQS抽象佇列同步器原始碼解析--鎖獲取過程
要深入瞭解java併發知識,AbstractQueuedSynchronizer(AQS)是必須要拿出來深入學習的,AQS可以說是貫穿了整個JUC併發包,例如ReentrantLock,CountDownLatch,CyclicBarrier等併發類都涉及到了AQS。接下來就對AQS的實現原理進行分析。
在開始分析之前,勢必先將CLH同步隊列了解一下
CLH同步佇列
CLH自旋鎖: CLH(Craig, Landin, and Hagersten locks): 是一個自旋鎖,能確保無飢餓性,提供先來先服務的公平性。CLH自旋鎖是一種基於隱式連結串列(節點裡面沒有next指標)的可擴充套件、高效能、公平的自旋鎖,申請執行緒只在本地變數上自旋,它不斷輪詢前驅的狀態,如果發現前驅釋放了鎖就結束自旋。
AQS中的CLH同步佇列:AQS中CLH同步佇列是對CLH自旋鎖進行了優化,其主要從兩方面進行了改造:節點的結構與節點等待機制。
1.在結構上引入了頭結點和尾節點,他們分別指向佇列的頭和尾,嘗試獲取鎖、入佇列、釋放鎖等實現都與頭尾節點相關,並且每個節點都引入前驅節點和後後續節點的引用;
2.在等待機制上由原來的自旋改成阻塞喚醒。
原始碼中CLH的簡單表示
* +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+
Node就是實現CLH同步佇列的資料結構,計算下就瞭解下該類的相關欄位屬性
AQS中重要的內部類Node
static final class Node { // 共享模式 static final Node SHARED = new Node(); // 獨佔模式 static final Node EXCLUSIVE = null; // 如果屬性waitStatus == Node.CANCELLED,則表明該節點已經被取消 static final int CANCELLED = 1; // 如果屬性waitStatus == Node.SIGNAL,則表明後繼節點等待被喚醒 static final int SIGNAL = -1; // 如果屬性waitStatus == Node.CONDITION,則表明是Condition模式中的節點等待條件喚醒 static final int CONDITION = -2; // 如果屬性waitStatus == Node.PROPAGATE,在共享模式下,傳播式喚醒後繼節點 static final int PROPAGATE = -3; // 用於標記當前節點的狀態,取值為1,-1,-2,-3,分別對應以上4個取值 volatile int waitStatus; // 前驅節點 volatile Node prev; // 後繼節點 volatile Node next; // 當前節點對應的執行緒,阻塞與喚醒的執行緒 volatile Thread thread; // 使用Condtion時(共享模式下)的後繼節點,在獨佔模式中不會使用 Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
下面就開始著重對AQS中的重要方法進行分析說明
獲取鎖
1.acquire 開始獲取鎖
public final void acquire(int arg) {
//如果tryAcquire返回true,即獲取到鎖就停止執行,否則繼續向下執行向同步佇列尾部新增節點
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire是用於獲取鎖的方法,在AQS中預設沒有實現具體邏輯,由子類自定義實現。
如果返回true則說明獲取到鎖,否則需要將當前執行緒封裝為Node節點新增到同步佇列尾部。
2.當前節點入佇列
將當前執行的執行緒封裝為Node節點並加入到隊尾
private Node addWaiter(Node mode) {// 首先嚐試快速新增到隊尾,失敗再正常執行新增到隊尾
Node node = new Node(Thread.currentThread(), mode);
// 快速方式嘗試直接新增到隊尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果快速新增到隊尾失敗則執行enq(node)新增到隊尾
enq(node);
return node;
}
enq方法迴圈遍歷新增到隊尾
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 新增到佇列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter(Node mode)方法執行完後,接下來執行acquireQueued方法, 返回的是該執行緒是否需要中斷,該方法也是不停地迴圈獲取鎖,如果前節點是頭節點,則嘗試獲取鎖,獲取鎖成功則返回是否需要中斷標誌,如果獲取鎖失敗,則判斷是否需要阻塞並阻塞執行緒
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 標記是否需要被中斷
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果前驅節點是頭節點,並且獲取鎖成功,則返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判斷獲取鎖失敗後是否需要阻塞當前執行緒,如果阻塞執行緒後再判斷是否需要被中斷執行緒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(p, node)方法判斷是否需要阻塞當前執行緒
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果ws == Node.SIGNAL,則說明當前執行緒已經準備好被喚醒,因此現在可以被阻塞,之後等待被喚醒
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 如果ws > 0,說明當前節點已經被取消,因此迴圈剔除ws>0的前驅節點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果ws<=0,則將標誌位設定為Node.SIGNAL,當還不可被阻塞,需要的等待下次執行shouldParkAfterFailedAcquire判斷是否需要阻塞
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
如果shouldParkAfterFailedAcquire(p, node)方法返回true,說明需要阻塞當前執行緒,則執行parkAndCheckInterrupt方法阻塞執行緒,並返回阻塞過程中執行緒是否被中斷
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 阻塞執行緒,等待unpark()或interrupt()喚醒自己
// 執行緒被喚醒後檢視是否被中斷過。
return Thread.interrupted();
}
那麼重新回到獲取鎖的方法acquire方法,如果acquireQueued(final Node node, int arg)返回true,也即是阻塞過程中執行緒被中斷,則執行中斷執行緒操作selfInterrupt()
public final void acquire(int arg) {
//如果tryAcquire返回true,即獲取到鎖就停止執行,否則繼續向下執行向同步佇列尾部新增節點,然後判斷是否被中斷過,是則執行中斷
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
中斷當前執行緒
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
小結
AQS獲取鎖的過程:
1.執行tryAcquire方法獲取鎖,如果獲取鎖成功則直接返回,否則執行步驟2
2.執行addWaiter方法將當前執行緒封裝位Node節點並新增到同步佇列尾部,執行步驟3
3.執行acquireQueued迴圈嘗試獲取鎖,,如果獲取鎖成功,則判斷返回中斷標誌位,如果獲取鎖失敗則呼叫shouldParkAfterFailedAcquire方法判斷是否需要阻塞當前執行緒,如果需要阻塞執行緒則呼叫parkAndCheckInterrupt阻塞執行緒,並在被喚醒後再判斷再阻塞過程中是否被中斷過。
4.如果acquireQueued返回true,說明在阻塞過程中執行緒被中斷過,則執行selfInterrupt中斷執行緒
好了,以上就是AQS的鎖獲取過程,關於鎖釋放的分析會在後續繼續輸出