AbstractQueuedSynchronizer(AQS)原始碼分析詳解
AbstractQueuedSynchronizer
是一個抽象的同步等待佇列。juc下面多大多數鎖都是使用到了AQS
。除了獲取資源和釋放資源的方法外,AQS
基本上將其他同步等待佇列的邏輯都實現了,比如現場進入佇列同步等待、相應中斷、喚醒執行緒等。如果我們想實現一個簡單的同步等待佇列,那隻需要實現AQS
的獲取和釋放資源的方法就行了(實際上還有一個用來判斷釋放是當前現場持有資源的方法需要實現)。
1. AQS類的基本結構
首先實現了AbstractOwnableSynchronizer
類,裡面維護了一個FIFO雙向佇列,也就是我們的等待佇列,記錄了佇列的頭節點和尾節點;還維護了一個狀態資訊state
state
有著不同的意義,但是可以肯定的是state
跟執行緒釋放進入等待佇列是由密切關係的。
在多執行緒情況下,對雙向佇列的頭節點和尾節點以及state
的修改都是使用的cas進行的修改(不存在多執行緒的話,就直接賦值了,如果獨佔鎖釋放鎖資源,因為之會有一個執行緒持有鎖資源)。
1.1 AbstractOwnableSynchronizer類結構
核心就是設定和獲取exclusiveOwnerThread
,exclusiveOwnerThread
表示獨佔模式下持有鎖的執行緒。
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { /** Use serial ID even though all fields transient. */ private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } /** * The current owner of exclusive mode synchronization. */ private transient Thread exclusiveOwnerThread; /** * Sets the thread that currently owns exclusive access. * A {@code null} argument indicates that no thread owns access. * This method does not otherwise impose any synchronization or * {@code volatile} field accesses. * @param thread the owner thread */ protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } /** * Returns the thread last set by {@code setExclusiveOwnerThread}, * or {@code null} if never set. This method does not otherwise * impose any synchronization or {@code volatile} field accesses. * @return the owner thread */ protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
1.2 等待佇列
同步等待佇列中是要給雙向佇列,在AQS
中記錄了頭節點和尾節點(都用了volatile
修飾,確保了head
和tail
的可見性),一般來說頭節點是一個空節點(這點後面會說到)。每個節點裡面記錄了當前節點的等待模式(獨佔模式或者共享模式),同時記錄了當前節點的等待狀態,pre和next記錄了當前節點的上一個節點和下一個節點,thread表示節點對應的執行緒。
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail; static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
上面兩個有參構造方法,一個是被addWiter呼叫,一個是被Condition(也就是條件佇列)呼叫,通過兩個方法我們可以看到,addWaiter建立的node,waitStatus=0;通過Condition建立的node,waitStatus是CONDITION(-2)狀態(addConditionWaiter
方法中可以看到)。需要注意的是一般共享鎖是不支援Condition的,所以通過第二個有參構造方法建立的節點是處於獨佔模式的,這樣我們就可以直接根據nextWaiter
就能判斷出節點的等待模式。
1.3 同步狀態state
在不同的鎖實現中,有著不同的意義,比如在ReentractLock
中,state
表示當前執行緒獲取鎖的重入次數。在ReentractReadWriteLock
中,state
的高16位表示持有讀鎖的執行緒數,低16位表示寫鎖的重入次數。但是無論state
怎麼存資料,他都與鎖的獲取與釋放有著密切的關係。
/**
* The synchronization state.
*/
private volatile int state;
1.4 幾個核心方法
以tryAcquire
和acquire
為例,tryAcquire
只是進行鎖資源的獲取,獲取成功就返回true,失敗返回false,而acquire
會根據tryAcquire
的返回結果來判斷,是否將當前執行緒阻塞放入等待佇列中。看名字我們就可以很簡單的分辨出方法直接點對應關係。在比如tryReleaseShared
是嘗試釋放鎖資源,如果返回true,就會去嘗試喚醒後繼處於等待狀態的節點。
以try開頭的這些方法是我們在實現自定義同步等待佇列的時候,需要重點關注的方法,我們需要重寫方法,實現具體的邏輯。當然,如果你只是自定義一個獨佔鎖,那你只需要實現tryAcquire
、tryRelease
、isHeldExclusively
就行,共享鎖相關的,你可以不用管。如果你想實現例如讀寫鎖,那就需要將共享模式對應的方法也要重寫,進行實現。
// 獨佔模式獲取鎖資源
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 獨佔模式釋放鎖資源
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式獲取鎖資源
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式釋放鎖資源
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 獨佔模式下,當前執行緒釋放是持有鎖的執行緒
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
2. 獨佔鎖
獨佔鎖指的是隻能有一個執行緒持有鎖資源,如果其他執行緒想要持有鎖,需要等待持有鎖的執行緒釋放資源之後,否則會進入等待佇列等待(沒有獲取到鎖資源),直到等到有執行緒釋放了鎖資源,並且對該等待執行緒進行了喚醒,此時執行緒就會去再次嘗試獲取鎖資源。
2.1 acquire
首先判斷獲取鎖資源釋放成功,如果成功,後面的就不能執行了,如果失敗,就需要進入等待佇列,等待其他執行緒喚醒自己,喚醒後會更新中斷狀態,再次去嘗試獲取鎖資源,如果獲取鎖成功,就根據中斷狀態處理中斷(返回true就會執行selfInterrupt
)。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2.1.1 建立等待節點和初始化佇列
這裡直接指定了當前執行緒和等待模式,建立了一個新的節點(上面我們在說等待佇列的時候已經說過這個構造方法了),然後判斷佇列是否需要初始化。如果不需要初始化,就會將剛建立的節點設定為尾節點。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq
方法幹了兩件事
- 如果發現佇列沒有初始化,就建立一個空節點,進行佇列初始化,此時佇列的頭節點和尾節點指向了同一個空節點。
- 初始化佇列之後,迴圈並沒有退出,而是來設定新建立的節點為尾節點,設定成功後推出迴圈。此時佇列的頭節點就是一個空節點,佇列的尾節點就是我們剛建立的的新節點。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2.1.2 阻塞執行緒
進入到該方法後,如果當前節點的前驅節點是頭節點,會先嚐試獲取一個鎖資源tryAcquire
,所以一般來說,一個執行緒在呼叫acquire
後,會進行兩次鎖資源獲取,如果獲取鎖資源成功了,就將當前節點設定為頭節點,最後返回中斷狀態(按照我們說的這個流程的話,interrupted的狀態就是宣告時候的false),會在上面我們說的selfInterrupt
方法中處理中斷(這裡是線上程獲取到鎖資源之後處理中斷狀態)。
如果嘗試獲取鎖資源失敗,就會執行shouldParkAfterFailedAcquire
,來判斷(當前節點的前驅節點必須是signal狀態才進入阻塞狀態)釋放需要將當前執行緒掛起,如果返回true,就會執行後面的parkAndCheckInterrupt
將當前執行緒阻塞。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
就做了一件事,就是讓自己(當前節點)在一個signal狀態的節點後進入阻塞狀態。如果當前節點的前驅節點是一個有效狀態,就直接修改為signal狀態,如果不是,說明前驅節點被取消了,就需要往前找,找到一個有效狀態的節點,跟在後面。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 必須是signal狀態,才能被阻塞,否則就會再次去嘗試獲取一下鎖資源
return true;
if (ws > 0) { // 只有CANCELLED=1 這一個狀態,如果當前節點的前驅節點被取消了,就需要往前找一個狀態為有效狀態的waitStatus<=0節點,跟在他後面。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 如果是有效狀態,但是不是signal,就cas修改為signal
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
使用LockSupport
的park
方法將當前執行緒阻塞掛起。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted(); // 被喚醒後的第一件事是返回中斷狀態,並重置中斷狀態
}
2.1.3 從等待佇列中移除
根據acquireQueued
中的程式碼,我們可以知道,只有噹噹前節點未獲取到鎖資源,並且丟擲了異常,failed
就是初始值true
,就會執行下面的cancelAcquire
方法,該方法主要的目的是將當前節點從佇列中移除,會同時移除掉當前節點前後waitStatus=canceled
的節點,如果發現當前節點是head節點會去喚醒後繼節點。
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null; // 將當前節點thread置為null
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev; // 從當前節點往前找,找到一個有效等待節點 waitStatus <=0
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED; // 將當前節點的狀態設定為cancelled(1)
if (node == tail && compareAndSetTail(node, pred)) { // 如果他是tail節點,直接使用我們找到的pred節點替換他
compareAndSetNext(pred, predNext, null); // 因為是tail節點,所以節點後面就不會有下一個節點了,cas將next設定為null,這裡next=null,prev不變
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) { // 如果找到的pred不是頭節點,就將pred的waitStatus設定為signal
Node next = node.next; // 噹噹前節點的next是一個有效等待節點,就cas設定給pred的next
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node); // 如果當前節點時head,就去喚醒他的後繼節點。
}
node.next = node; // help GC
}
}
2.1.4 中斷處理
將狀態操作補上,selfInterrupt
執行,是因為acquireQueued
返回true,根據上面的程式碼我們可以看到acquireQueued
返回的就是中斷狀態,並且只有獲取到鎖資源的執行緒(這裡說的是正常請求,拋異常也是可以返回的),才能從acquireQueued
中返回。如果是被中斷了,但是沒有獲取到鎖資源,那麼就會將中斷狀態記錄下來,等拿到鎖資源後返回。所以這裡的中斷操作我們才說是補的。因為存線上程被中斷了,但是一直沒有拿到鎖,導致不能馬上響應中斷操作。
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
2.2 release
release
是釋放鎖資源,如果tryRelease
釋放鎖資源成功(返回true),就會判斷頭節點是否有後繼節點需要喚醒。根據前面分析的程式碼,我們可以知道,當節點初始化時waitStatus
是0,當有新的節點掛在剛初始化的節點的後面時,waitStatus
會被修改為signal(-1)
狀態,所以這裡判斷 h.waitStatus != 0
的意思是說頭節點後面還有後繼節點。此時就會去喚醒後繼節點。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
2.2.1 喚醒後繼節點
這裡傳入的node是head節點,首先是將head節點的waitStatus
修改為0,並且拿到head的後繼節點,去喚醒。如果候機節點不是一個有效等待節點,就從佇列的尾部開始從後往前找,找到一個有效等待節點去喚醒。
- 目前看來將
waitStatus=0
會增加一次後繼節點(將要喚醒的節點嘗試獲取鎖的機會),可以去看一下acquireQueued
的程式碼,喚醒後會嘗試湖區一次鎖,如果獲取鎖失敗,會執行shouldParkAfterFailedAcquire
方法,因為waitStatus=0
,如果第一次會將waitStatus
cas為signal
,然後返回false,此時就會再去嘗試獲取一下鎖,如果依舊獲取失敗,就會執行被阻塞(因為shouldParkAfterFailedAcquire
方法判斷到waitStatus=signle
,直接返回true)。所以waitStatus
設定為0,會增加一次獲取鎖的機會。 - 我們知道,尾節點的next是null,這裡我們獲取到的s==null其實並不表示s就一定是tail節點。因為還存在新增加一個節點next還沒來得及設定。也正是因為這原因,所以這裡在找一個有效等待節點時,選擇了從後往前找,因為pre的更新是可靠的,下面補一點前面
enq
中的程式碼,我們可以看到在新增一個節點時,第一步是將新增加的節點的prev指向當前tail節點,然後進行第二步將新增加的節點設定tail節點,最後是第三步將舊的tail節點的next只想新增加的節點。首先者三個操作肯定不是原子性操作。當第二部操作成功了,說明第一步操作也肯定成功了,此時就會出現一種情況舊的tail節點的next還沒指向新的tail節點,就會出現unparkSuccessor
中s=null
(s不是tail節點)的請求,如果是從前往後遍歷,因為next是不可靠的,會出現s=null但是s並不是tail節點的情況。所以使用從後往前遍歷,因為prev的更新,在第一步操作的時候就完成了,新增節點能成功tail的前提是prev已經更新了。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t; // 1
if (compareAndSetTail(t, node)) { // 2
// 可能執行緒會被掛起,來不及執行第三步
t.next = node; // 3
return t;
}
}
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
2.3 其他
2.3.1 acquireInterruptibly
與acquire
相比,多了很多中斷處理的判斷,會對中斷進行及時處理。不會像acquire
那樣,需要等到獲取到鎖資源才去處理中斷。這裡主要說與acquire
相比的一些不同點。
在進入方法後就會呼叫Thread.interrupted()
獲取當前中斷狀態,並重置中斷狀態,如果被中斷了就丟擲異常。然後是獲取鎖資源。可以看到這裡換成了doAcquireInterruptibly
處理排隊。裡面與acquireQueued
很類似,核心邏輯沒變。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
這裡將addWaiter
方法放了進來,在parkAndCheckInterrupt
之後,如果發現中斷了,就直接拋異常,此時會執行cancelAcquire
方法將當前節點從佇列中移除。最後將異常丟擲。
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 這裡直接丟擲異常,而不是僅僅對中斷狀態進行記錄
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.3.2 tryAcquireNanos
不經會對中斷進行及時處理,還設定了一個等待超時時間,核心是使用了LockSupport.parkNanos
方法,當超過執行時間會返回。
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
首先會計算出絕對時間,如果獲取到了鎖資源就返回true,如果超時了就返回false。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold) // 時間小於等於給定值,就直接不park了
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3. 共享鎖
對於共享鎖來說,鎖資源與具體執行緒是不相關的,如果一個執行緒獲取了鎖資源,另一個執行緒再來獲取鎖資源時,如果鎖資源還能滿足當前執行緒需求,那麼依舊可以獲取鎖資源。
3.1 acquireShared
首先是通過tryAcquireShared
嘗試獲取鎖資源,並根據返回值判斷,是否獲取鎖資源成功,<0
表示獲取鎖資源失敗,=0
說明獲取到了鎖資源,但是沒有多餘的資源了,>0
說明獲取到了鎖資源,並且還有多餘的資源。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
3.1.1 doAcquireShared
與獨佔鎖的實現很類似,首先呼叫addWaiter
建立節點並新增到佇列中,然後獲取當前節點的前驅節點,如果前驅節點是頭節點,就嘗試獲取鎖資源,根據返回值判斷是否獲取到鎖資源,如果獲取到了鎖資源,這裡首先會執行setHeadAndPropagate
方法,該方法主要是更行head
,然後根據條件看是否需要區喚醒後繼節點,這裡的中斷與acquire
中基本一致,獲取到鎖資源後,判斷記錄的中斷狀態,然後響應中斷。後面的程式碼和acquire
中的基本一致,就不多說了。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.1.2 setHeadAndPropagate
這裡的setHead
和acquireQueued
中的一樣,調的是同一個方法,重點是後面的判斷,propagate
表示的是剩餘的鎖資源,根據if中的程式碼我們可以知道,只有當存在剩餘鎖資源或者頭節點為null(一般來說head不會為null),或者頭節點waiteStatus<0
也就是需要被喚醒,就獲取當前節點的後繼節點判斷,後繼節點是否是null或者是共享節點(也就是判斷是否是一個非獨佔模式的節點),就會執行doReleaseShared
進行節點喚醒。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
3.1.3 doReleaseShared
將頭節點的是signal,說明需要喚醒後繼節點,就從signal修改為0,如果修改成功,就呼叫unparkSuccessor
喚醒後繼節點。如果頭節點的狀態是0,就從0改為PROPAGATE,不做其他操作。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
3.2 releaseShared
共享鎖釋放資源,首先使用tryReleaseShared
進行鎖資源釋放,根據返回結果判斷是否執行doReleaseShared
區喚醒後繼節點。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
3.3 其他
3.3.1 acquireSharedInterruptibly
共享模式下獲取鎖響應中斷。跟acquireInterruptibly
處理中斷基本一致,看一眼下面程式碼就知道了。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.3.2 tryAcquireSharedNanos
共享模式下的獲取鎖並設定等待超時時間。
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
4. 總結
AQS
提供了執行緒同步的基本實現,提供了兩種模式,獨佔模式和共享模式。使用連結串列的方式維護了一個雙向佇列,記錄存放著一個一個等待的執行緒。通過對被volatile
修改的state
操作,進行鎖資源的獲取與釋放,根據操作結果對佇列中節點進行阻塞和喚醒。我們需要仔細分割槽在什麼情況是存在多執行緒併發,在什麼情況下同一時刻只會有一個執行緒執行(比如獨佔鎖釋放鎖資源),從而確定我們的操作是否需要確保執行緒安全(cas操作)。其中獨佔和共享兩種模式其實編碼上很相似,只不過共享模式在每次獲取完資源後,會判斷是否是否有剩餘資源,從而選擇是否區喚醒後繼節點。
部落格園:http://www.cnblogs.com/godfunc
Copyright ©2019 Godfunc