【併發程式設計】 圖文深入解析Java顯示鎖底層原始碼 —— 加解鎖是如何實現的
一、瞭解 AbstractQueuedSynchronizer(AQS)
1、AQS 簡介
AbstractQueuedSynchronizer
是大師 Doug Lea 編寫的一個併發程式設計類,位於 java.util.concurrent.locks,是 CountdownLatch、Semaphore、ReentrantLock、ReentrantReadWriteLock、ThreadPoolExecutor 中重要的組成部分,他們中關於 “鎖” 的部分與 AQS
息息相關。
借用一下原始碼中的說法,AbstractQueuedSynchronizer
基於一個 FIFO 佇列
提供了一套阻塞鎖和同步相關的實現。該類被設計成為很多同步容器 synchronizers
private volatile int state;
來表示當前狀態。當在 AQS
被 acquired
(獲取資源) 或被 release
(釋放資源)時,需要依據這個 state
來進行判斷。所以子類需要定義方法來修改這個狀態,該狀態的含義由我們自由定製。(翻譯的不好...)
2、實現最簡單的 AQS
我們來看一個最簡單的例子,我們有一個類 Sync
繼承了 AbstractQueuedSynchronizer
,並重寫了其 tryAcquire
和 tryRelease
方法。實現非常簡單,我們通過呼叫父類的 compareAndSetState()
setState()
來完成,簡單來說(不是特別準確),就是 tryAcquire
返回 true
,代表獲取鎖成功,否則就會阻塞。而 tryRelease
則負責鎖的釋放。
在例子中:將 state
設定為 100
代表當前狀態為無鎖,1
則代表已經有某個執行緒獲取了該鎖。當然這個 state
表達的含義是怎麼樣的,完全是我們定義的,實際上鎖定或者無鎖是 100
還是 200
還是 -100
,都沒有什麼關係。
/** * Created by Anur IjuoKaruKas on 2019/5/7 */ public class Mutex extends AbstractQueuedSynchronizer { public static class Sync extends AbstractQueuedSynchronizer { public Sync() { setState(100); // set the initial state, being unlocked. } @Override protected boolean tryAcquire(int ignore) { boolean result = compareAndSetState(100, 1); print("嘗試獲取鎖" + (result ? "成功" : "失敗")); return result; } @Override protected boolean tryRelease(int ignore) { setState(100); return true; } } private final Sync sync = new Sync(); public void lock() { sync.acquire(0); } public void unLock() { sync.release(0); } public static void main(String[] args) throws InterruptedException { Mutex mutex = new Mutex(); mutex.lock(); Thread thread = new Thread(() -> { print("呼叫 mutex.lock() 之前"); mutex.lock(); print("呼叫 mutex.lock() 之後"); }); thread.start(); print("main 執行緒 Sleep 之前"); Thread.sleep(5000); print("main 執行緒 Sleep 之後"); mutex.unLock(); } public static void print(String print) { System.out.println(String.format("時間 - %s\t\t%s\t\t%s", new Date(), Thread.currentThread(), print)); } } ========================================= 輸出 時間 - Fri May 24 15:44:19 CST 2019 Thread[main,5,main] 嘗試獲取鎖成功 時間 - Fri May 24 15:44:19 CST 2019 Thread[main,5,main] main 執行緒 Sleep 之前 時間 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 呼叫 mutex.lock() 之前 時間 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 嘗試獲取鎖失敗 時間 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 嘗試獲取鎖失敗 時間 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 嘗試獲取鎖失敗 時間 - Fri May 24 15:44:24 CST 2019 Thread[main,5,main] main 執行緒 Sleep 之後 時間 - Fri May 24 15:44:24 CST 2019 Thread[Thread-0,5,main] 嘗試獲取鎖成功 時間 - Fri May 24 15:44:24 CST 2019 Thread[Thread-0,5,main] 呼叫 mutex.lock() 之後
我們可以看到,程式碼符合我們的預期:在 main 函式所線上程呼叫 mutex.unLock();
釋放鎖之前,子執行緒是一直阻塞的,呼叫 mutex.lock() 之後
的日誌輸出發生在 main 執行緒 Sleep 之後
之後。
通過重寫 tryAcquire
、tryRelease
方法,以及呼叫 acquire
和 release
方法,我們很容易就實現了一個鎖,當然這個鎖有一堆問題... 我們只是通過這個小例子,來建立對 AQS
一個簡單的瞭解。
看到這裡,有些細心的小夥伴可能會想了,既然鎖是由 tryAcquire
控制的,那和 state
又有什麼關係呢? 我們完全可以定義一個自定義變數,比如 sign
,false
代表無鎖,true
代表鎖定,好像也可以實現這段邏輯啊?這個時候就需要引出我們神奇的 compareAndSet
,CAS
操作了。
3、AQS 繞不過的話題: CAS Compare And Swap
前面說到,我們暫時認為 :tryAcquire
返回 true
,代表獲取到鎖,反之只要 tryAcquire
返回 flase
,執行緒就會被阻塞(不準確,後面會細說)。實際上這裡有一個 隱含條件,我們必須做到:
- ※ 無論何時,都只能有一個執行緒
tryAcquire
成功,且在某個執行緒tryAcquire
成功之後,並在其release
釋放鎖之前,任何執行緒進行tryAcquire
都將返回false
。
是的,就是併發問題!
下面這個例子我們簡單使用一個自定義變數 sign
來實現 tryAcquire
,看看會發生什麼:
private boolean sign;
@Override
protected boolean tryAcquire(int ignore) {
boolean result = false;
if (!sign) {
sign = true;
result = true;
}
print("嘗試獲取鎖" + (result ? "成功" : "失敗"));
return result;
}
@Override
protected boolean tryRelease(int ignore) {
sign = false;
return true;
}
========================================= 輸出
時間 - Fri May 24 18:03:12 CST 2019 Thread[main,5,main] 嘗試獲取鎖成功
時間 - Fri May 24 18:03:12 CST 2019 Thread[main,5,main] main 執行緒 Sleep 之前
時間 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 呼叫 mutex.lock() 之前
時間 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 嘗試獲取鎖失敗
時間 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 嘗試獲取鎖失敗
時間 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 嘗試獲取鎖失敗
時間 - Fri May 24 18:03:17 CST 2019 Thread[main,5,main] main 執行緒 Sleep 之後
時間 - Fri May 24 18:03:17 CST 2019 Thread[Thread-0,5,main] 嘗試獲取鎖成功
時間 - Fri May 24 18:03:17 CST 2019 Thread[Thread-0,5,main] 呼叫 mutex.lock() 之後
看起來好像沒問題,在這個 demo 中也得到了和第一個 DEMO 一樣的預期的結果。然而事情並沒有那麼簡單,新寫的這個 tryAcquire
實現是一個 "CompareThenSet"
操作,在併發的情況下,會出現不可預期的情況
- 執行緒A 進來,發現
sign
為false
- 執行緒B 同時進來,也發現
sign
為false
- 兩者同時將
sign
修改為true
,問題就來了。
到底是 執行緒A 獲取到了鎖,還是 執行緒B 呢?(實際上都獲取到了)
我們改一下 Main 方法,我們使用 100 個執行緒併發執行 mutex.lock();
獲取鎖成功則會輸出語句 print("獲取鎖成功");
,執行,發現,竟然有兩個執行緒同時獲取到了鎖。有兩個執行緒同時將 sign
修改為了 true
。
public static void main(String[] args) throws InterruptedException {
Mutex mutex = new Mutex();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
threads.add(new Thread(() -> {
mutex.lock();
print("獲取鎖成功");
}));
}
ExecutorService executorService = Executors.newFixedThreadPool(100);
threads.forEach(executorService::submit);
Thread.sleep(1000);
}
如果我們使用 AQS
幫我們寫好的 compareAndSetState
則沒有這個問題。
在 Java9
之前,底層實現是呼叫 unsafe
包的 compareAndSwapInt
來實現的:
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
而在 Java9
之後,則是使用 VarHandle
來實現 VarHandle
是 unSafe
的一個替代方案,本文不多贅述,後面會有文章講到這個 ~ 。
// VarHandle mechanics
private static final VarHandle STATE;
---------------------------------------------
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
這裡簡單的說一下 CAS
,即 CompareAndSwap
。CAS
可原子性地比較並替換一個值,樂觀鎖中一個典型的實現便是使用 CAS
來完成的。對併發程式設計有所瞭解的小夥伴應該都知道 CAS
,一般情況下,Compare(比較)
和 Swap(交換)
至少是兩個原子操作(實際上是更多個原子操作,主要看編譯成多少條機器碼)。而 CAS
則保證了 Compare
和 Swap
為一個原子操作。
二、深入理解 AbstractQueuedSynchronizer(AQS) 資源鎖定與解鎖正向流程
上文說到,我們暫時認為 :tryAcquire
返回 true
,代表獲取到鎖,反之只要 tryAcquire
返回 flase
,執行緒就會被阻塞。
AQS
當然沒有這麼簡單,但我們可以先看看加鎖時呼叫的 acquire
方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我們發現,tryAcquire
只是第一重判斷,如果 tryAcquire
失敗,緊接著還有另一個核心邏輯 acquireQueued
。在簡介裡,我們說,AQS
除了使用一個 原子state
來作為狀態判斷以外,還有一個 FIFO 佇列
,此佇列就和 acquireQueued
方法息息相關。另外,AQS
所控制的資源訪問,還可以是共享的,或者獨佔的(addWaiter
引數 Node.EXCLUSIVE
)。
以下的分析我們以一個簡單的 獨佔式非公平 AQS
實現: java.util.concurrent.locks.ReentrantLock.NonfairSync
來深入解析。獨佔式很好理解,大部分的鎖實現都只允許一個執行緒在同一時間獲取到鎖定的資源。
1、TryAcquire 與 TryRelease 的標準寫法及其優化
先看看 NonfairSync
的 tryAcuire
是怎麼實現及優化的。首先,NonfairSync
中將 state == 0
定義為無鎖狀態。
- 競爭優化: 如果當前無鎖(
state == 0
),再呼叫CAS
。這實際上對效能是一個很好的優化,假設當前取state
不為0
,實際上CompareAndSetState
成功的概率也很小,這也可以避免同一時間內,過多的執行緒去併發修改state
這個狀態。 - 重入設計: 試想如果我們不判斷當前執行緒是否持有鎖,就去進行
CAS
操作,會發生什麼?毫無疑問是CAS
失敗,這會間接導致死鎖。這裡我們可以看到,重入以後,有一個int nextc = c + acquires;
操作,這是方便我們記錄到底套了幾層鎖用的,如果沒有這個機制,我們將無法精確的控制加鎖和解鎖的層級,難免會出現一些意料之外的情況。簡單來說:lock
幾次,就要unLock
幾次。當然我們也可以做到aquire
多次,一次性release
掉,或者反過來,取決於怎麼我們實現tryAquire
和tryRlease
方法。 - 偏向優化: 這個優化實際上很簡單,如果說要獲取鎖的執行緒就是鎖的持有執行緒,我們無需去進行任何
CAS
操作,返回true
即可。
@ReservedStackAccess
final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 避免過多的執行緒競爭 CAS 操作
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);// 如果 CAS 操作成功,則將當前執行緒儲存起來,重入和解鎖時用於判斷。
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // 重入優化,每次加鎖相當於 `state++`
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true; // 偏向優化
}
return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 每次解鎖相當於 `state--` 直到 state == 0 ,代表可釋放鎖了
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2、AcquireQueued 解析
①、addWaiter階段
如果 tryAquire
失敗,就會進入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
,addWaiter
方法建立了一個新的 Node
例項,Node
例項中主要儲存了當前執行緒資訊,並將 nextWaiter
賦值為 Node.EXCLUSIVE
, 這個 nextWaiter
後面再談,它主要用於執行緒排程、以及獨佔模式、共享模式的區分,我們可以先不管它。
操作比較簡單,原理是將 node
塞入雙向連結串列尾端,也就是前面提到的 FIFO佇列
。就是利用 CAS
操作將新建立的、帶有本執行緒資訊的 node
設定為雙向連結串列新的 tail
,並且修改兩者的 ‘指標’ prev
和 next
。
/** Constructor used by addWaiter. */
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread());
}
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue(); // 初始化雙向連結串列,就是建立一個新的空 node,並且頭尾都是此 node。
// 這個 node 除了拿來標記連結串列從哪裡開始,沒有什麼別的意義。
}
}
②、acquireQueued階段(自旋)
入隊成功後,進入 acquireQueued
方法,拋開執行緒被 interrupt
的情況,acquireQueued
的程式碼其實也很簡單,我們不看 interrupt
相關邏輯,其實邏輯還是很簡單的。這是一個無限迴圈(或者說自旋),只要沒有 tryAcquire
成功,就會一直迴圈下去,邏輯如下:
- 如果上一個節點是
FIFO
佇列頭,則進行一次tryAquire
,如果成功,則跳出迴圈。 - 檢測是否需要阻塞,如果需要阻塞,則阻塞等待喚醒,
parkAndCheckInterrupt
便是阻塞直到被喚醒(或者被interrupt
,暫時先不考慮這個情況)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquire()
和 parkAndCheckInterrupt()
都很好理解,前者就是去嘗試一下獲取鎖定資源,看能否成功。後者則是阻塞直到被喚醒。
③、阻塞階段
我們先說說 shouldParkAfterFailedAcquire
,這個判斷是一個挺有意思的設計,後續文章會細說,它和執行緒排程、取消獲取鎖等相關。因為在獲取鎖定資源和釋放鎖定資源的過程中,實際上我們只需要用到兩個狀態,一個是初始狀態 pred.waitStatus == 0
,另一個是 pred.waitStatus == SIGNAL == -1
。
程式碼中我們可以很容易看出,在 CAS
將 prev
節點的 waitStatus
設定為 SIGNAL : -1
之前,都將返回 false
,如果設定成功,下一次自旋進入該方法就是 true
了,也就是說,會進入 parkAndCheckInterrupt()
方法,阻塞直到被喚醒。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
自旋階段圖解:
阻塞階段圖解:
3、release 解析
①、喚醒 FIFO 的下一個節點
阻塞直到喚醒這個邏輯在鎖定資源、釋放資源 這兩個階段來看十分簡單,最後我們來看看 release
做了什麼,release
除了呼叫了我們自己實現的 tryRelease
之外,其實關鍵的就是這個 unparkSuccessor
。
tryRelease
上面也說過了,就是改改原子 state
,這裡不多贅述。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
程式碼中可以看出,當 FIFO
佇列不為空且頭結點的 waitStatu
被修改過,就會進入 unparkSuccessor
,unparkSuccessor
傳入了當前 FIFO
的佇列頭,邏輯如下:
- 如果當前節點
waitStatus
為負(可能為SIGNAL
、CONDITION
或者PROPAGATE
),我們這裡簡單先看成只有SIGNAL
狀態,則CAS
將其設定為0
。其他幾個狀態我們後面會說到。 - 如果
!(s == null || s.waitStatus > 0)
,也就是說node.next
的waitStatus <= 0
,則簡單的直接將其喚醒:LockSupport.unpark(s.thread);
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
②、被喚醒後
被喚醒的執行緒當然不是直接獲得了鎖,它還是會繼續 acquireQueue
進行自旋,邏輯還是和之前一樣,避免小夥伴往上翻程式碼,這裡貼了一份如果 prev
是頭結點,如果 tryAcquire
成功,我們看到其實很簡單,只是將自己設為頭部即可。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
這篇文章只是簡單的說說 AQS
的正向獲取資源,釋放資源流程,後續會繼續解析 wait
、notify
、condition
等基於 AQS
的執行緒排程解析 ~~ 以及各個鎖是如何實現 AQS
的 ~~
文章皆是基於原始碼一步步分析,沒有參考過多資料,如有錯誤,請指出!!
另外歡迎來 Q 群討論技術相關(目前基本沒人)[左二維碼]~
如果覺得寫得好還可以關注一波訂閱號喲 ~ 部落格和訂閱號同步更新 [右二維碼]~
參考資料:
JDK12 原始碼
Brief introduction to AbstractQueuedSynchronizer by Using a Simple Mutex Example
另外小夥伴可以思考一下:
- 如果說階段7:
ThreadB
被喚醒後,繼續自旋時,另一個執行緒ThreadC
tryAcquire
成功了會發生什麼。 - 如果說第一個問題了解了,那應該就很清楚為什麼說本文解析的這個鎖叫做:非公平鎖了
- 眾所周知,只要是
CAS
操作,都有ABA
問題,如果說修改waitStatus
發生了ABA
問題,會