【併發程式設計】 圖文深入解析Java顯式鎖底層原始碼 —— condition 實現執行緒排程
一、回顧 AQS 資源的鎖定與釋放
上篇文章(文章中有詳細的原始碼解讀) 說到,AQS
在 tryRelease
失敗後,資源的鎖定與釋放,正向流程大體可以分為以下6個階段。
1-2:當其他執行緒佔據了鎖定的資源,另一個執行緒進行獲取時,會進入
FIFO
佇列,如果佇列未初始化,則進入初始化。
3:進入了 FIFO 佇列之後,開始自旋,並不斷嘗試將前一個節點的
waitStatus
替換成-1 == SIGNAL
4:最後一次自旋,前一個節點的
waitStatus
已經是-1 == SIGNAL
,則進入阻塞模式`
5:當持有資源鎖定的執行緒呼叫了
release
將state
從 1 改為 0之後,本執行緒被喚醒,繼續自旋
6:如果本執行緒節點的
prev
節點為HEAD
,則有機會進行一次資源獲取,如果獲取成功(將state
由 0 改為 1 ),則將自己置為頭結點,自旋結束。
注意:
為了防止混淆,本文將 AQS
的佇列(上篇文章,也是上圖說到的佇列)稱為 On Sync Queue
(藍底+淺藍色Node
),本文中伴隨 Condition
物件出現的佇列稱為 Condition Queue
(藍底+黑色Node
)
二、AQS 之 Condition 簡單 Demo
上篇文章,我們只說到了 waitStatus
的初始狀態 0
以及 SIGNAL = -1
next
節點進行 unPark
,即喚醒 next
節點。
而接下來這一小節將著重分析 CONDITION = -2
的情況,-2
的意思是代表當前這個節點在 Condition Queue
中排隊,等待通知 (signal
)。
Condition
的建立十分簡單,在原來的顯式鎖上呼叫 newCondition()
即可,使用方法和我們熟知的 wait
、notify
類似,condition
為我們提供了 await
、signal
方法,但是它可以做更加細粒度的控制,我們看看下面這個簡單的 Demo。
/** * Created by Anur IjuoKaruKas on 2019/6/4 */ public class Condition { private ReentrantLock reentrantLock = new ReentrantLock(); private java.util.concurrent.locks.Condition meetWaiter = reentrantLock.newCondition(); private java.util.concurrent.locks.Condition fruitWaiter = reentrantLock.newCondition(); private void buyMeet() throws InterruptedException { try { reentrantLock.lock(); print("前去買肉發現沒貨"); meetWaiter.await(); print("被通知:肉進貨了~"); } finally { reentrantLock.unlock(); } } private void buyFruit() throws InterruptedException { try { reentrantLock.lock(); print("前去水果發現沒貨"); fruitWaiter.await(); print("被通知:水果進貨了~"); } finally { reentrantLock.unlock(); } } private void meetIn() { try { reentrantLock.lock(); print("通知:肉進貨了~"); meetWaiter.signal(); } finally { reentrantLock.unlock(); } } private void fruitIn() { try { reentrantLock.lock(); print("通知:水果進貨了~"); fruitWaiter.signal(); } finally { reentrantLock.unlock(); } } public static void main(String[] args) throws InterruptedException { Condition condition = new Condition(); new Thread(() -> { try { condition.buyFruit(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { condition.buyFruit(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { condition.buyMeet(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); Thread.sleep(1000); condition.fruitIn(); Thread.sleep(1000); condition.meetIn(); } public static void print(String print) { System.out.println(String.format("時間 - %s\t\t%s\t\t%s", new Date(), Thread.currentThread(), print)); } } ==================================================== 時間 - Wed Jun 05 10:22:14 CST 2019 Thread[Thread-0,5,main] 前去水果發現沒貨 時間 - Wed Jun 05 10:22:14 CST 2019 Thread[Thread-2,5,main] 前去買肉發現沒貨 時間 - Wed Jun 05 10:22:14 CST 2019 Thread[Thread-1,5,main] 前去水果發現沒貨 時間 - Wed Jun 05 10:22:15 CST 2019 Thread[main,5,main] 通知:水果進貨了~ 時間 - Wed Jun 05 10:22:15 CST 2019 Thread[Thread-0,5,main] 被通知:水果進貨了~ 時間 - Wed Jun 05 10:22:16 CST 2019 Thread[main,5,main] 通知:肉進貨了~ 時間 - Wed Jun 05 10:22:16 CST 2019 Thread[Thread-2,5,main] 被通知:肉進貨了~
例子雖然舉的比較粗俗...... 但是核心就是執行緒的排程,我們可以在某些條件下使得顯示鎖阻塞,且通過某些條件被喚醒。
可以看到,我們可以分別為 fruitWaiter
或者 meetWaiter
進行細粒度的喚醒 signal
(其實還有個 signalAll
)。至於 condition
的使用我們這裡不做過多贅述。
三、AQS 之 Condition 正向流程原始碼解析
我們先縱覽一下 await
方法:(不考慮執行緒被 interrupt
的情況)
- 呼叫
addConditionWaiter();
,這一步實際上和我們前面說的FIFO
佇列很像,操作的是Condition Queue
- 呼叫
int savedState = fullyRelease(node);
,如果對前面說的release
有印象的話,那麼這個就很好理解了,一般我們一次release
,正常實現都是使得state --
,對應acquire
使得state ++
。而這個fullyRelease
則是一次性釋放掉所有state
,直接讓state
歸零,並儲存state
狀態。 isOnSyncQueue
則是進行一系列判斷、阻塞與自旋,它是控制condition
阻塞的核心程式碼(實際上很簡單)。
- 被其他持鎖執行緒
signal
進行通知彈出Condition Queue
,且進入On Sync Queue
。
- 回到我們上篇文章說的
tryAcquire
自旋(本文第一章階段3-6),實際上到這一步,condition
阻塞已經完畢了,接下來回歸我們的正常流程,可以理解為,此時被某個執行緒通知喚醒,但是一喚醒我們不能並不能立刻獲得資源,正常的流程還是要走的。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();// 1、進入 condition 佇列
int savedState = fullyRelease(node);// 2、釋放資源,並記錄 state
int interruptMode = 0;
while (!isOnSyncQueue(node)) {// 3、condition 佇列阻塞,直到被其他持鎖執行緒 signal(或者被 interrupt)才會停止自旋。
LockSupport.park(this); // 阻塞
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;// 被 interrupt
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)// /5、重新自旋,開始申請鎖定資源
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // 6、如果有必要的話,修改 condition 佇列
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
下面我們逐一對其進行原始碼分析:
1、addConditionWaiter 另一個 FIFO 佇列!
程式碼十分簡單:
unlinkCancelledWaiters
迭代清理所有waitState
不為Node.CONDITION
的節點,並重新設定尾節點。- 新建一個
Node
,並將其塞到尾部。Node
物件上篇文章已經講過,其實它就是一個搭載了一些狀態,以及當前執行緒的一個例項。
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
2、靈活的 state 應用:fullyRelease
程式碼更加簡單,如果你還記得上篇文章所說的,正常實現都是使得 state --
,對應 acquire
使得 state ++
的話。這裡實際上就是一夜回到解放前,release
所有 state
。
release
的實現就不多說了,上篇文章裡已經說得很清楚了。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
3、阻塞、且等待被喚醒:isOnSyncQueue 相關
先說說這個 while
迴圈,先不看 isOnSyncQueue
的實現,外面邏輯很清晰,只要 isOnSyncQueue
為假,執行緒就會阻塞(park
)。後續則是進行是否被 interrupt
的判斷,如果被 interrupt
,則跳出迴圈,否則在 isOnSyncQueue
為真之前,執行緒會不停的被阻塞、喚醒、阻塞、喚醒。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
接著再看看 isOnSyncQueue
是如何實現的。通過前面我們可以知道,結果為真,就可以跳出 while
迴圈。使得結果為真的條件只有兩個:
- 如果
node.next != null
,則結果為真。 - 本執行緒的
Node
已經位於On Sync Queue
了:findNodeFromTail
方法是一個簡單的查詢方法,但它是從On Sync Queue
的tail
節點,不斷往前尋找,如果找到了本Node
,則結果為真。
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
在 Condition Queue
的解析中,到目前為止只出現了 FirstWaiter
、LastWaiter
、以及 Node
本身的成員變數 nextWaiter
。為什麼會出現在上篇文章 On Sync Queue
中涉及的 next
、prev
等 "指標" 作為判斷條件呢?(參考本文第一章那幾張圖)
4、通知可以離開 Condition Queue 了,但實際上並不喚醒:signal 實現分析
帶著上面的疑問,我們來到了 signal()
方法
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
signal
方法的門面比較簡單,isHeldExclusively
需要自己實現,ReentrantLock
內部的 Sync
實現是判斷當前執行緒是否持有鎖定資源,也就是判斷 getExclusiveOwnerThread() == Thread.currentThread();
鎖的執行緒持有者和當前是否相等。
isHeldExclusively
的設計十分靈活,如果必要的話,我們可以實現一個不需要持有鎖執行緒便可進行 signal
的 AQS
實現,即:不做任何判斷直接返回 true
即可。注意,要使用 condition
必須實現此方法!!。
signal
方法的核心是 doSignal(first);
,我們重點看看在這裡做了什麼:
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
-
do
階段主要是將Condition Queue
隊頭節點的nextWaiter
變成新的隊頭,並同時將nextWaiter
引用擦除(情況1),如果沒有nextWaiter
,則將佇列清空(情況2)。 -
while
階段則是兩個常規判斷,(first = firstWaiter) != null
很好理解,類似遞迴呼叫,不做贅述。關鍵看看!transferForSignal(first)
。總結一下就是當transferForSignal
為真或者佇列已經空了,則跳出while
迴圈。
transferForSignal
主要做了如下操作:
- 將當前
Node
的狀態由CONDITION == -2
改為0
,失敗則返回flase
enq(node);
,這個其實就是上篇文章說道的addWaiter
的核心操作,就是將當前Node
塞進On Sync Queue
。- 優化操作,如果上個節點剛好
cancel (ws > 0)
了,或者CAS
失敗,則將當前節點直接喚醒( 其實就是給了condition
一個優先去競爭原子state
的機會 )。
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
整理一下 signal
的邏輯,可以用下圖表示,用一句話簡單總結就是將 Condition Queue
的頭部取出,拿到 On Sync Queue
的尾部。
5、老生常談:tryAcquire
這一步不多說,略過,上篇文章已經解釋的很清楚了。
四、AQS 之 Condition 總覽
AQS
中 On Sync Queue
和 Condition Queue
的關係可以如下表示:
- 每個
Condition
都有自己的Condition Queue
,且多個Condition Queue
之間的await()
、signal()
方法相互不影響。 - 當某個持有鎖的執行緒呼叫了某個
Condition
的await()
方法以後,會釋放掉鎖,且進入該Condition
所對應的Condition Queue
的隊尾。 - 當有某個執行緒呼叫了某個
Condition
的signal()
方法後,該Condition
所對應的Condition Queue
隊頭出列,緊接著進入到On Sync Queue
隊尾。注意,該節點並不會被直接喚醒,只是進了On Sync Queue
隊尾。
文章皆是基於原始碼一步步分析,沒有參考過多資料,如有錯誤,請指出!!
另外歡迎來 Q 群討論技術相關(目前基本沒人)[左二維碼]~
如果覺得寫得好還可以關注一波訂閱號喲 ~ 部落格和訂閱號同步更新 [右二維碼]~
參考資料:
JDK12 原始碼
另外小夥伴可以思考一下:
- 在節點從
Condition Queue
出隊時,如果上個節點剛好cancel (ws > 0)
了,或者CAS
失敗,則將當前節點直接喚醒,這個優化是為什麼? - 本文沒有提到
Condition
的signalAll()
方法,呼叫這個方法後,會