前文「JDK原始碼分析-AbstractQueuedSynchronizer(2)」分析了 AQS 在獨佔模式下獲取資源的流程,本文分析共享模式下的相關操作。
1. acquireShared(int arg): 以共享模式獲取資源,忽略中斷;
2. acquireSharedInterruptibly(int arg): 以共享模式獲取資源,響應中斷;
3. tryAcquireSharedNanos(int arg, long nanosTimeout): 以共享模式獲取資源,響應中斷,且有超時等待;
4. releaseShared(int arg): 釋放資源,喚醒後繼節點,並確保傳播。
1. 共享模式獲取資源(忽略中斷)
public final void acquireShared(int arg) { // 返回值小於 0,表示獲取失敗 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } // 嘗試以共享模式獲取資源(返回值為 int 型別) protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
與獨佔模式的 tryAcquire 方法類似,tryAcquireShared 方法在 AQS 中也丟擲異常,由子類實現其邏輯。
不同的地方在於,tryAcquire 方法的返回結果是 boolean 型別,表示獲取成功與否;而 tryAcquireShared 的返回結果是 int 型別,分別為:
1) 負數:表示獲取失敗;
2) 0:表示獲取成功,但後續共享模式的獲取會失敗;
3) 正數:表示獲取成功,後續共享模式的獲取可能會成功(需要進行檢測)。
若 tryAcquireShared 獲取成功,則直接返回;否則執行 doAcquireShared 方法:
private void doAcquireShared(int arg) { // 把當前執行緒封裝成共享模式的 Node 節點,插入主佇列末尾 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // 中斷標誌位 boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 若前驅節點為頭節點,則嘗試獲取資源 if (p == head) { int r = tryAcquireShared(arg); // 這裡表示當前執行緒成功獲取到了資源 if (r >= 0) { // 設定頭節點,並傳播狀態(注意這裡與獨佔模式不同) setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 是否應該休眠(與獨佔模式相同,不再贅述) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 取消操作(與獨佔模式相同) cancelAcquire(node); } }
doAcquireShared 方法會把當前執行緒封裝成一個共享模式(SHARED)的節點,並插入主佇列末尾。addWaiter(Node mode) 方法前文已經分析過,不再贅述。
該方法與 acquireQueued 方法的區別在於 setHeadAndPropagate 方法,把當前節點設定為頭節點之後,還會有傳播(propagate)行為:
private void setHeadAndPropagate(Node node, int propagate) { // 記錄舊的頭節點 Node h = head; // Record old head for check below // 將 node 設定為頭節點 setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 後繼節點為空或共享模式喚醒 if (s == null || s.isShared()) doReleaseShared(); } }
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { // 這裡的頭節點已經是上面設定後的頭節點了 Node h = head; // 由於該方法有兩個入口(setHeadAndPropagate 和 releaseShared),需考慮併發控制 if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 喚醒後繼節點 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 若頭節點不變,則跳出迴圈;否則繼續迴圈 if (h == head) // loop if head changed break; } }
該方法與獨佔模式下的獲取方法 acquire 大體相似,不同在於該方法中,節點獲取資源後會傳播狀態,即,有可能會繼續喚醒後繼節點。值得注意的是:該方法有兩個入口 setHeadAndPropagate 和 releaseShared,可能有多個執行緒操作,需考慮併發控制。
此外,本人對於將節點設定為 PROPAGATE 狀態的理解還不是很清晰,網上說法也不止一種,待後續研究明白再補充。
2. 以共享模式獲取資源(響應中斷)
該方法與 acquireShared 類似:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
tryAcquireShared 方法前面已分析,若獲取資源失敗,則會執行 doAcquireSharedInterruptly 方法:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 把當前執行緒封裝成共享模式節點,並插入主佇列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 與 doAcquireShared 相比,區別在於這裡丟擲了異常 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
從程式碼可以看到,acquireSharedInterruptibly 方法與 acquireShared 方法幾乎完全一樣,不同之處僅在於前者會丟擲 InterruptedException 異常響應中斷;而後者僅記錄標誌位,獲取結束後才響應。
3. 以共享模式獲取資源(響應中斷,且有超時)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
該方法可與獨佔模式下的超時等待方法 tryAcquireNanos(int arg, long nanosTimeout) 進行對比,二者操作基本一致,不再詳細分析。
4. 釋放資源,喚醒節點,傳播狀態
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
doReleaseShared() 方法前面已經分析過了。本方法與獨佔模式的 release 方法類似,不同的地方在於“傳播”二字。
場景如下:有 T0~T4 共 5 個執行緒按先後順序獲取資源,其中 T2 和 T3 為共享模式,其他均為獨佔模式。
就此場景分析:T0 先獲取到資源(假設佔用時間較長),而後 T1~T4 再獲取則失敗,會依次進入主佇列。此時主佇列中各個節點的狀態示意圖如下:
之後,T0 操作完畢並釋放資源,會將 T1 喚醒。T1(獨佔模式) 會從 acquireQueued(final Node node, int arg) 方法的迴圈中繼續獲取資源,這時會獲取成功,並將 T1 設定為頭節點(T 被移除)。此時主佇列節點示意圖如下:
此時,T1 獲取到資源並進行相關操作。
而後,T1 操作完釋放資源,並喚醒下一個節點 T2,T2(共享模式) 繼續從 doAcquireShared(int) 方法的迴圈中執行。此時 T2 獲取資源成功,將自身設為頭節點(T1 被移除),由於後繼節點 T3 也是共享模式,因此 T1 會繼續喚醒 T3;T3 喚醒後的操作與 T2 相同,但後繼節點 T4 不是共享模式,因此不再繼續喚醒。此時佇列節點狀態示意圖如下:
此時,T2 和 T3 同時獲取到資源。
之後,當二者都釋放資源後會喚醒 T4:
T4 獲取資源的與 T1 類似。
PS: 該場景僅供參考,只為便於理解,若有不當之處敬請指正。
1. acquireShared: 共享模式獲取資源,忽略中斷;
2. acquireSharedInterruptibly: 共享模式獲取資源,響應中斷;
3. tryAcquireSharedNanos: 共享模式獲取資源,響應中斷,有超時;
4. releaseShared: 釋放資源,喚醒後繼節點,並確保傳播。
並簡要分析一個場景下主佇列中各個節點的狀態。此外,AQS 中還有巢狀類 ConditionObject 及條件佇列的相關操作,後面涉及到的時候再進行分析。
單獨去分析 AQS 的原始碼比較枯燥,後文會結合 ReentrantLock、CountdownLatch 等常用併發工具類的原始碼進行分析。
