Java 併發程式設計(三)鎖與 AQS
本文 JDK 對應的版本為 JDK 13
由於傳統的 synchronized
關鍵字提供的內建鎖存在的一些缺點,自 JDK 1.5 開始提供了 Lock
介面來提供內建鎖不具備的功能。顯式鎖的出現不是為了替代 synchronized
提供的內建鎖,而是當內建鎖的機制不適用時,作為一種可選的高階功能
內建鎖與顯式鎖
內建鎖於顯式鎖的比較如下表:
類別 | synchronized | Lock |
---|---|---|
存在層次 | Java的關鍵字 | 是一個類 |
鎖的釋放 | 1、以獲取鎖的執行緒執行完同步程式碼,釋放鎖 2、執行緒執行發生異常,jvm會讓執行緒釋放鎖 |
在finally中必須釋放鎖, 不然容易造成執行緒死鎖 |
鎖的獲取 | 假設A執行緒獲得鎖,B執行緒等待。 如果A執行緒阻塞,B執行緒會一直等待 |
Lock有多個鎖獲取的方式 |
鎖狀態 | 無法判斷 | 可以判斷 |
鎖型別 | 可重入 不可中斷 非公平 | 可重入 可判斷 可公平(兩者皆可) |
效能 | 少量同步 | 大量同步 |
### 顯式鎖的基本使用
Lock
的定義如下:
public interface Lock { // 顯式地獲取鎖 void lock(); // 可中斷地獲取鎖,與 lock() 方法的不同之處在於在鎖的獲取過程可以被中斷 void lockInterruptibly() throws InterruptedException; // 以非阻塞的方式獲取鎖,呼叫該方法將會立即返回,如果成功獲取到鎖則返回 true,否則返回 false boolean tryLock(); /* 帶時間引數的 tryLock, 有三種情況:在規定時間內獲取到了鎖;在規定的時間內執行緒被中斷了;在規定的時間內沒有獲取到鎖 */ boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 釋放鎖 void unlock(); /* 獲取 “等待/通知” 元件,該元件和當前的執行緒繫結,當前的執行緒只有獲取到了鎖, 才能呼叫該元件的 wait 方法,而呼叫之後,當前執行緒將會釋放鎖 */ Condition newCondition(); }
常用的 Lock
的實現類為 java.util.concurrent.locks.ReentrantLock
,使用的示例如下:
private final static Lock lock = new ReentrantLock();
static int value = 0;
static class Demo implements Runnable {
@Override
public void run() {
lock.lock();
try {
value++;
} finally { // 一定要講解鎖操作放入到 finally 中,否則有可能會造成死鎖
lock.unlock();
}
}
}
ReentrantLock
是基於 java.util.concurrent.locks.AbstractQueuedSynchronizer
的具體子類來實現同步的,這個類也被稱為 AQS
,是 JUC
中實現 Lock
最為核心的部分
AQS
構建同步類
使用 AQS
構建同步類時獲取鎖和釋放鎖的標準形式如下:[1]
boolean acquire() throws InterruptedException {
while (當前狀態不允許獲取操作) {
if (需要阻塞獲取請求) {
如果當前執行緒不在佇列中,則將其插入佇列
阻塞當前執行緒
} else {
返回失敗
}
}
可能更新同步器的狀態
如果執行緒位於佇列中,則將其移出佇列
返回成功
}
void release () {
更新同步器狀態
if (新的狀態允許某個阻塞的執行緒獲取成功) {
解除佇列中一個或多個執行緒的阻塞狀態
}
}
對於支援獨佔式的同步器,需要實現一些
protected
修飾的方法,包括tryAcquire
、tryRelease
、isHeldExclusively
等;對於支援共享式的同步器,應該實現的方法有
tryAcquireShared
、tryReleaseShared
等
AQS
的acquire
、acquireShared
和release
、releaseShared
等方法都將呼叫這些方法在子類中帶有的字首try
的版本來判斷某個操作能否被執行。在同步器的子類中,可以根據其獲取操作和釋放操作的語義,使用
getState
、setState
以及compareAndSetState
來檢查和更新狀態,並根據返回的狀態值來告知基類 “獲取” 和 “釋放” 同步的操作是否是成功的。
原始碼解析
AQS
的類結構圖如下:
類屬性分析
-
AQS
例項物件的屬性AQS
中存在非static
的欄位如下(static
欄位沒有分析的必要):// 頭節點,即當前持有鎖的執行緒 private transient volatile Node head; // 阻塞佇列的尾結點,每個新的節點進來都會插入到尾部 private transient volatile Node tail; /* 代表鎖的狀態,0 表示沒有被佔用,大於 0 表示有執行緒持有當前的鎖 這個值可以大於 1,因為鎖是可重入的,每次重入時都會將這個值 +1 */ private volatile int state; /* 這個屬性繼承自 AbstractOwnableSynchronizer, 表示當前持有獨佔鎖的執行緒 */ private transient Thread exclusiveOwnerThread;
-
佇列節點物件的屬性
static final class Node { // 標記當前的節點處於共享模式 static final Node SHARED = new Node(); // 表示當前的節點處於獨佔模式 static final Node EXCLUSIVE = null; // 這個值表示當前節點的執行緒已經被取消了 static final int CANCELLED = 1; // 表示當前節點的下一個節點需要被喚醒 static final int SIGNAL = -1; // 表示當前節點在等待一個條件 static final int CONDITION = -2; // 表示下一個 acquireShared 應當無條件地傳播 static final int PROPAGATE = -3; /* 當前節點的等待狀態,取值為上面的 CANCELLED、SIGNAL、CONDITION、PROPAGATE 或者 0 */ volatile int waitStatus; // 當前節點的前節點 volatile Node prev; // 當前節點的下一個節點 volatile Node next; // 當前節點儲存的執行緒 volatile Thread thread; // 連結到下一個等待條件的節點(條件佇列),或者是特殊值為 SHARED 的節點 Node nextWaiter; }
最後得到的阻塞佇列如下圖所示:
[2]
注意,這裡的阻塞佇列不包含頭結點 head
具體分析
-
acquire(int arg)
該方法位於
java.util.concurrent.locks.AbstractQueuedSynchronizer
中,具體對應的原始碼如下:public final void acquire(int arg) { /* 如果 tryAcquire(arg) 成功了(即嘗試獲取鎖成功了),那麼就直接獲取到了鎖 否則,就需要呼叫 acquireQueued 方法將這個執行緒放入到阻塞佇列中 */ if (!tryAcquire(arg) && // 如果嘗試獲取鎖沒有成功,那麼久將當前的執行緒掛起,放入到阻塞佇列中 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire(arg)
對應的原始碼如下:// AbstractQueuedSynchronizer 中定義的。。。 protected boolean tryAcquire(int arg) { // 在 AbstractQueuedSynchronizer 中定義的模版方法,需要具體的子類來實現 throw new UnsupportedOperationException(); }
為了簡化這個過程,以
ReentrantLock
的FairSync
為例檢視具體的實現:// ReentrantLock.FairSync。。。 @ReservedStackAccess /* 嘗試直接獲取鎖,返回值為 boolean,表示是否獲取到鎖 返回為 true: 1.沒有執行緒在等待鎖 2.重入鎖,執行緒本來就持有鎖,因此可以再次獲取當前的鎖 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // state 為 0 表示此時沒有執行緒持有鎖 /* 當前的鎖為公平鎖(FairSync),因此即使當前鎖是可以獲取的, 但是需要首先檢查是否已經有別的執行緒在等待這個鎖 */ if (!hasQueuedPredecessors() && /* 如果沒有執行緒在等待,那麼則嘗試使用 CAS 修改狀態獲取鎖,如果成功,則獲取到當前的鎖 如果使用 CAS 獲取鎖失敗,那麼就說明幾乎在同一時刻有個執行緒搶先獲取了這個鎖 */ compareAndSetState(0, acquires)) { // 到這裡就已經獲取到鎖了,標記一下當前的鎖,表示已經被當前的執行緒佔用了 setExclusiveOwnerThread(current); return true; } } /* 如果已經有執行緒持有了當前的鎖,那麼首先需要檢測一下是不是當前執行緒持有的鎖 如果是當前執行緒持有的鎖,那麼就是一個重入鎖,需要對 state 變數 +1 否則,當前的鎖已經被其它執行緒持有了,獲取失敗 */ else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
現在再回到
acquire
方法,如果trAcquire(arg)
成功獲取到了鎖,那麼就是成功獲取到了鎖,直接返回即可;如果tryAcquire(arg)
獲取鎖失敗了,則再執行acquireQueued
方法將當前執行緒放入到阻塞佇列尾部在那之前,首先會執行
acquireQueued
方法中呼叫的addWaiter(Node.EXCLUSIVE)
方法,具體的原始碼如下:// AbstractQueuedSynchronizer /* 這個方法的作用是將當前的執行緒結合給定的 mode 組合成為一個 Node,以便插入到阻塞佇列的末尾 結合當前的上下文,傳入的 mode 為 Node.EXCLUSIVE,即獨佔鎖的模式 */ private Node addWaiter(Node mode) { Node node = new Node(mode); for (;;) { // 注意這裡的永真迴圈。。。 Node oldTail = tail; /* 如果尾結點不為 null,則使用 CAS 的方式將 node 插入到阻塞佇列的尾部 */ if (oldTail != null) { node.setPrevRelaxed(oldTail); // 設定當前 node 的前驅節點為原先的 tail 節點 if (compareAndSetTail(oldTail, node)) { // CAS 的方式設定尾結點 oldTail.next = node; return node; // 返回當前的節點 } } else { // 如果當前的阻塞佇列為空的話,那麼首先需要初始化阻塞佇列 initializeSyncQueue(); } } } // 初始化阻塞佇列對應的原始碼如下 private final void initializeSyncQueue() { Node h; // 依舊是使用 CAS 的方式,這裡的 h 的初始化為延遲初始化 if (HEAD.compareAndSet(this, null, (h = new Node()))) tail = h; }
之後就是執行
acquireQueued
方法了,對應的原始碼如下:// AbstractQueuedSynchronizer /* 此時的引數 node 已經經過 addWaiter 的處理,已經被新增到阻塞佇列的末尾了 如果 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 呼叫之後返回 true,那麼就會執行 acquire(int arg) 方法中的 selfInterrupt() 方法 這個方法是比較關鍵的部分,是真正處理執行緒掛起,然後被喚醒去獲取鎖,都在這個方法中定義 */ final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { // 注意這裡的永真迴圈 // predecessor() 返回的是當前 node 節點的前驅節點 final Node p = node.predecessor(); /* p == head 表示當前的節點雖然已經進入到了阻塞佇列,但是是阻塞佇列中的第一個元素(阻塞佇列不包含 head 節點) 因此當前的節點可以嘗試著獲取一下鎖,這是由於當前的節點是阻塞佇列的第一個節點,而 head 節點又是延遲初始化的,在這種情況下是有可能獲取到鎖的 */ if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } /* 如果執行到這個位置,則說明 node 要麼就不是隊頭元素,要麼就是嘗試獲取鎖失敗 */ if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } } // parkAndCheckInterrupt() 對應的原始碼 /* 該方法的主要任務是掛起當前執行緒,使得當前執行緒在此等待被喚醒 */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 該方法用於掛起當前執行緒 return Thread.interrupted(); }
shouldParkAfterFailedAcquire(p, node)
對應的原始碼如下:// AbstractQueuedSynchronizer /* 這個方法的主要任務是判斷當前沒有搶到鎖的執行緒是否需要阻塞 第一個引數表示當前節點的前驅節點,第二個引數表示當前執行緒的節點 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前驅節點正常,則需要阻塞當前執行緒節點 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; /* 前驅節點的狀態值大於 0 表示前驅節點取消了排隊 如果當前的節點被阻塞了,喚醒它的為它的前驅節點,因此為了使得能夠正常工作, 需要將當前節點的前驅節點設定為一個正常的節點,使得當前的節點能夠被正常地喚醒 */ if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ /* 如果不滿足以上兩個條件,那麼當前的 ws 的狀態就只能為 0, -2, -3 了 在當前的上下文環境中,ws 的狀態為 0,因此這裡就是將當前節點的前驅節點的 ws 值設定為 Node.SIGNAL */ pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } /* 本次執行到此處會返回 false,而 acquireQueued 中的永真迴圈將會再次進入這個方法 由於上面的一系列操作,當前節點的前驅節點一定是正常的 Node.SIGNAL,因此會在第一個 if 語句中直接返回 true */ return false; }
-
release(int arg)
該方法用於釋放當前獲取到的鎖,對應的具體的原始碼如下:
// AbstractQueuedSynchronizer // 釋放在獨佔模式中獲取到的鎖 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease(arg)
對應的原始碼如下:// AbstractQueuedSynchronizer // 很明顯,這也是一個模版方法,需要具體子類來定義對應的實現 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
依舊以
ReentrantLock
為例,檢視一下tryRelease(int arg)
的具體實現// ReentrantLock.Sync @ReservedStackAccess protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 是否已經完全釋放鎖的標記 // 如果 c > 0,則說明獲取的鎖是一個重入鎖,還沒有完全釋放 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
再回到
release(int arg)
方法中,如果是已經完全釋放了鎖,則執行後面的return false
語句,執行結束。如果沒有完全釋放鎖,那麼則會繼續執行unparkSuccessor(h)
方法,對應的原始碼如下:// AbstractQueuedSynchronizer // 喚醒後繼節點 private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) node.compareAndSetWaitStatus(ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ /* 喚醒後繼節點,但是可能後繼節點取消了等待(即 waitStatus = Node.CANCELLED) 在這種情況下,將會從隊尾向前查詢,找到最靠近 head 的 waitStatus < 0 的節點 */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 從隊尾開始向前查詢,找到第一個合適的節點 for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) // 可能排在前面的節點取消的可能性更大 s = p; } if (s != null) // 喚醒這個合適的節點對應的執行緒 LockSupport.unpark(s.thread); }
在釋放了所有的鎖之後,喚醒後繼的一個還沒有被取消的執行緒節點,然後喚醒它,喚醒之後的節點將恢復原來在
parkAndCheckInterrupt()
中的執行狀態private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 被喚醒後將繼續執行後面的程式碼 return Thread.interrupted(); // 此時應當是沒有被中斷的 }
再回到原先的
acquireQueued(node, arg)
方法,此時由於 head 已經釋放了鎖,而當前的 node 節點是距離 head 最近的一個有效的執行緒節點,因此它能夠獲取到鎖,執行緒在獲取鎖之後再繼續執行對應的程式碼邏輯
ConditionObject
ConditionObject
一般用於 “生產者—消費者” 的模式中,與基於Object
的 wait()
和 notifyAll()
實現的通訊機制十分類似。
對應的 ConditionObject
的原始碼如下:
public class ConditionObject implements Condition, java.io.Serializable {
// 條件佇列的第一個節點
private transient Node firstWaiter;
// 條件佇列的最後一個節點
private transient Node lastWaiter;
}
與前文的阻塞佇列相對應,條件佇列與阻塞佇列的對應關係圖如下所示:
[3]
具體解釋:
- 條件佇列和阻塞佇列的節點,都是 Node 的例項物件,因為條件佇列的節點是需要轉移到阻塞佇列中取得
ReentrantLock
的例項物件可以通過多次呼叫newCondition()
方法來生成新的Condition
物件(最終由AQS
的具體子類物件生成)。在AQS
中,對於Condition
的具體實現為ConditionObject
,這個物件只有兩個屬性欄位:firstWaiter
和lastWaiter
- 每個
ConditionObject
都有一個自己的條件佇列,執行緒 1 通過呼叫Condition
物件的await
方法即可將當前的呼叫執行緒包裝成為 Node 後加入到條件佇列中,然後阻塞在條件佇列中,不再繼續執行後面的程式碼 - 呼叫
Condition
物件的signal()
方法將會觸發一次喚醒事件,與Object
的notify()
方法類似。此時喚醒的是條件佇列的隊頭節點,喚醒後會將firstWaiter
的節點移動到阻塞佇列的末尾,然後在阻塞佇列中等待獲取鎖,之後獲取鎖之後才能繼續執行
await
方法
await
方法對應的原始碼如下:
// AbstractQueuedSynchronizer.ConditionObject
/*
丟擲 InterruptedException 表示這個方法是可以被中斷的
這個方法會被阻塞,直到呼叫 signal 方法(singnal 和 singnalAll)喚醒或者被中斷
*/
public final void await() throws InterruptedException {
// 按照規範,應該在最開始的位置就首先檢測一次中斷
if (Thread.interrupted())
throw new InterruptedException();
// 將當前的執行緒封裝成 Node,新增到條件佇列中
Node node = addConditionWaiter();
/*
釋放鎖,返回值是釋放鎖之前的 state 值
在呼叫 await 方法之前,當前的執行緒肯定是持有鎖的,在這裡需要釋放掉當前持有的鎖
*/
int savedState = fullyRelease(node);
int interruptMode = 0;
/*
isOnSyncQueue(node) 返回 true 表示當前的節點已經從條件佇列轉移到阻塞隊列了
*/
while (!isOnSyncQueue(node)) {
/*
如果當前的節點不在阻塞佇列中,那麼將當前節點中的執行緒掛起,
直到通過呼叫 Condition 物件的 signal* 方法來喚醒它
*/
LockSupport.park(this);
// 執行緒被中斷,因此需要退出當前的迴圈
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 進入阻塞佇列之後,等待獲取鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter()
對應的原始碼如下:
// AbstractQueuedSynchronizer.ConditionObject
/*
將當前執行緒包裝成一個 Node,插入的條件佇列末尾
*/
private Node addConditionWaiter() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 當前 ConditionObject 中條件佇列的尾節點
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
/*
如果尾結點的執行緒已經被取消了,那麼就清除它
注意當前節點所處的佇列為條件佇列,因此每個節點的狀態都應該是 Node.CONDITION
*/
if (t != null && t.waitStatus != Node.CONDITION) {
// 該方法會從前到後清除所有的不滿足條件的節點
unlinkCancelledWaiters();
t = lastWaiter;
}
// 建立一個新的 Node,當前的 Node 的 waitStatus 為 Node.CONDITION
Node node = new Node(Node.CONDITION);
// 處理初始佇列為空的情況
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/*
清除當前 ConditionObject 的條件佇列中所有 waitStatus 不為 CONDITION 的節點
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
// 單純的連結串列移除節點的操作
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
fullyRelease(node)
對應的原始碼如下:
// AbstractQueuedSynchronizer.ConditionObject
/*
該方法的主要目的是完全釋放當前節點中執行緒持有的鎖
之所以是完全釋放,這是因為鎖是可重入的
*/
final int fullyRelease(Node node) {
try {
/*
由於顯式鎖是可重入的,因此在呼叫 await() 時也必須再恢復到原來的狀態
回憶一下 Node 節點中 state 屬性代表的意義,如果 state > 0 表示當前持有的鎖的數量
獲取這個鎖的數量,使得在進入阻塞佇列中的 Node 能夠再恢復到原來的狀態
*/
int savedState = getState();
if (release(savedState)) // 參見上文有關 release 方法的介紹
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
/*
如果在釋放鎖的過程中失敗了,那麼就將這個節點的狀態設定為 CANCELLED,
在之後的處理中會移除這個節點
*/
node.waitStatus = Node.CANCELLED;
throw t;
}
}
isOnSyncQueue(node)
對應的原始碼:
// AbstractQueuedSynchronizer.ConditionObject
/*
判斷當前的節點是否是從條件佇列中轉移到了阻塞佇列,並且正在等待被喚醒
*/
final boolean isOnSyncQueue(Node node) {
/*
從條件佇列中移動到阻塞佇列中時,node 的 waitStatus 將會被設定為 0
如果 node 的 waitStatus 依舊為 Node.CONDITION,那麼則說明它還在條件佇列中
如果 node 的前驅節點為 null,那麼也一定還在等待佇列中(阻塞佇列中每個節點都會有前驅節點)
*/
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果 node 都已經存在後繼節點了,那麼肯定在阻塞佇列中了
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
/*
由於 CAS 在將條件佇列中的節點移動到阻塞佇列中時可能會失敗,(具體可以檢視 AQS 的入隊方法)
此時當前節點的前驅節點不為 null,為了解決這個問題,
需要遍歷阻塞佇列來確保當前的節點確實是已經進入到了阻塞佇列
*/
return findNodeFromTail(node);
}
// 對應的原始碼。。。。
private boolean findNodeFromTail(Node node) {
// We check for node first, since it's likely to be at or near tail.
// tail is known to be non-null, so we could re-order to "save"
// one null check, but we leave it this way to help the VM.
/*
從尾結點開始遍歷搜尋節點,檢查是否在阻塞佇列中
*/
for (Node p = tail;;) {
if (p == node)
return true;
if (p == null)
return false;
p = p.prev;
}
}
signal
方法
signal
方法用於喚醒正在等待的執行緒,在當前的環境下,signal
的主要目的是喚醒在條件佇列中執行緒節點,將它們移動到阻塞佇列中
AQS
中對於 signal()
方法的實現如下:
// AbstractQueuedSynchronizer.ConditionObject
/*
移動等待了最久的執行緒,將它從條件佇列移動到阻塞佇列
*/
public final void signal() {
// 呼叫 signal 的執行緒必須持有當前的獨佔鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 一般第一個節點就被視作 “等待最久” 的執行緒
Node first = firstWaiter;
// 真正喚醒執行緒
if (first != null)
doSignal(first);
}
/*
從前往後查詢第一個符合條件的節點(有的執行緒可能已經被取消或者被中斷了)
*/
private void doSignal(Node first) {
do {
// 移除第一個節點
/*
如果移除第一個節點之後條件佇列中不再有節點了,那麼需要將 lastWaiter
節點也置為 null
*/
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 移除該節點和佇列之間的連線關係
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null); // 遍歷佇列,直到找到第一個滿足條件的節點
}
// AbstractQueuedSynchronizer
/*
將條件佇列中的節點移動到阻塞佇列
返回 true 表示轉移成功,false 則表示這個節點在呼叫 signal 之前就被取消了
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
/*
CAS 修改當前節點的 waitStatus如果失敗,說明該節點所在的執行緒已經被取消了
*/
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
/*
這裡的的 p 是 node 在進入阻塞佇列之後的前驅節點
*/
Node p = enq(node); // 以自旋的方式進入阻塞佇列的隊尾
int ws = p.waitStatus;
/*
ws > 0 表示 node 在阻塞佇列中的前驅節點取消了等待,直接喚醒 node 對應的執行緒
ws <= 0,那麼在進入阻塞佇列的時候需要將 node 的前驅節點設定為 SIGNAL,表示前驅節點會喚醒後繼節點
*/
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
在喚醒執行緒之後,再檢視 await()
方法中的邏輯:
public final void await() throws InterruptedException {
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 當前執行緒被掛起
// 掛起後的後置處理
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// ……………………………………………………
}
interruptMode
可選的值如下:
REINTERRUPT
:在await
方法返回的時候,需要重新設定中斷狀態THROW_IE
:代表await
方法返回的時候,需要丟擲InterruptedException
異常- 0:表示在
await
方法呼叫期間,該執行緒沒有被中斷
執行緒被喚醒之後的第一步操作是呼叫 checkInterruptWhileWaiting(node)
檢查當前的執行緒是否被中斷了,對應的原始碼如下:
// AbstractQueuedSynchronizer.ConditionObject
// 返回對應 interruptMode 中的三個值
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// AbstractQueuedSynchronizer
/*
只有執行緒被中斷的情況下,才會呼叫此方法
如果需要的話,將這個已經取消等待的節點轉移到阻塞佇列
返回 true :如果此執行緒在 signal 呼叫之前被取消
*/
final boolean transferAfterCancelledWait(Node node) {
/*
CAS 將節點狀態設定為 0
如果這一步 CAS 成功,則說明是呼叫 signal 方法之前就已經發生了中斷,
因為 signal 方法會將條件佇列的首個節點的 waitStatus 置為 0 再移動到阻塞佇列
如果不為 0 則說明要麼被取消了,要麼還沒有呼叫 signal 進行處理
*/
if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
enq(node); // 可以看到,即使被中斷了,依舊會將這個節點放入到阻塞佇列
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
/*
如果會走到這,那麼一定是 CAS 設定 node 的 waitStatus 失敗了,
即是在呼叫 signal 之後發生的中斷
signal 會將節點移動從條件佇列移動到阻塞佇列,但是可能由於某些原因還沒有移動完成,
因此在這裡通過自旋的方式等待其完成
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
可以看到,即使發生了中斷,依舊會完成將 node 從條件佇列轉移到阻塞佇列
喚醒執行緒後繼續向下走,對應的原始碼如下:
public final void await() throws InterruptedException {
// 省略部分程式碼
/*
當 acquireQueued 方法返回 true 時,說明執行緒已經被中斷了
如果此時 interruptMode 為 THROW_IE 的話,說明在呼叫 signal 方法之前就已經被中斷了
在這種情況下,將 interruptMode 置為 REINTERRUPT,以便之後重新中斷
*/
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
}
繼續向下執行,對應的原始碼:
/*
在呼叫 signal 時會斷開當前節點和後繼節點之間的連線,
如果此時後繼節點不為 null,說明是被中斷的,同樣需要斷開這個節點在條件佇列中的連線
*/
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 處理中斷
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
reportInterruptAfterWait(interruptMode)
對應的原始碼:
// AbstractQueuedSynchronizer.ConditionObject
// 處理中斷
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
// 根據 interruptMode 對中斷進行不同的處理
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
處理中斷
在 acquireQueued
方法的執行過程中,對於中斷的處理程式碼如下:
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
// 重點在於 parkAndCheckInterrupt 方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted(); // 該方法會清除中斷標記
}
在 acquireQueued
中,只是單純地使用一個變數 interrupted
來標記是否被中斷過,也就是說,在 acquireQueued
中,並不會處理中斷,即使當前的執行緒節點被中斷了,它依舊會嘗試去獲取鎖
具體對於中斷的處理由具體的實現來定義,可以忽略這個中斷,也可以丟擲一個異常
以 ReentrantLock
對於 lockInterruptibly()
的實現為例,具體的實現程式碼如下:
// ReentrantLock
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1); // 該方法為 AQS 中定義的方法
}
AQS
中對於 acquireInterruptibly
方法的定義如下:
// AbstractQueuedSynchronizer
public final void acquireInterruptibly(int arg)
throws InterruptedException {
/*
在 parkAndCheckInterrupt() 方法中通過 Thread.interrupted()
方法清除了執行緒的中斷標記,因此不會走這
*/
if (Thread.interrupted())
throw new InterruptedException();
// 繼續往下走
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// doAcquireInterruptibly 方法的定義如下
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
/*
關鍵在這,與不丟擲 InterruptedException 的相比,最大的區別就在於對於中斷的處理,
上文的 acquireQueued 則只是將中斷標記返回給呼叫者而不是顯式地丟擲一個異常
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node); // 取消該節點去獲取鎖的行為
throw t; // 傳遞捕獲到的異常
}
}
cancelAcquire(node)
對應的原始碼如下:
// AbstractQueuedSynchronizer
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
/*
找到符合條件的前驅節點,將不符合條件的前驅節點都清除
*/
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary, although with
// a possibility that a cancelled node may transiently remain
// reachable.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
/*
一般的連結串列清除節點工作
*/
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
pred.compareAndSetNext(predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
參考:
[1] 《Java 併發程式設計實戰》