ReentrantLock 原始碼分析以及 AQS (一)
阿新 • • 發佈:2020-03-14
## 前言
JDK1.5 之後釋出了JUC(java.util.concurrent),用於解決多執行緒併發問題。AQS 是一個特別重要的同步框架,很多同步類都藉助於 AQS 實現了對執行緒同步狀態的管理。
AQS 中最主要的就是獨佔鎖和共享鎖的獲取和釋放,以及提供了一些可中斷的獲取鎖,超時等待鎖等方法。
ReentranLock 是基於 AQS 獨佔鎖的一個實現。ReentrantReadWriteLock 是基於 AQS 共享鎖的一個讀寫鎖實現。本來打算一篇文章裡面寫完獨佔鎖和共享鎖,但是發現篇幅太長了,也不易於消化。
因此,本篇就先結合 ReentrantLock 原始碼,分析 AQS 的獨佔鎖獲取和釋放。以及 ReentrantLock 的公平鎖和非公平鎖實現。
下一篇再寫 ReentrantReadWriteLock 讀寫鎖原始碼,以及 AQS 共享鎖的獲取和釋放。
在正式講解原始碼之前,牆裂建議讀者做一些準備工作,最好對以下知識有一定的瞭解,這樣閱讀起來原始碼會比較輕鬆(因為,我當初剛開始接觸多執行緒時,直接看 AQS 簡直是一臉懵逼,就像讀天書一樣。。)。
1. 瞭解雙向連結串列的資料結構,以及佇列的入隊出隊等操作。
2. LockSupport 的 park,unpark 方法,以及對執行緒的 interrupt 幾個方法瞭解(可參考:[LockSupport的 park 方法是怎麼響應中斷的?](https://mp.weixin.qq.com/s/-adhc-LYfvd9YXqSxstFiQ))。
3. 對 CAS 和自旋機制有一定的瞭解。
## AQS 同步佇列
AQS 內部維護了一個 FIFO(先進先出)的雙向佇列。它的內部是用雙向連結串列來實現的,每個資料節點(Node)中都包含了當前節點的執行緒資訊,還有它的前後兩個指標,分別指向前驅節點和後繼節點。下邊看一下 Node 的屬性和方法:
```
static final class Node {
//可以認為是一種標記,表明了這個 node 是以共享模式在同步佇列中等待
static final Node SHARED = new Node();
//也是一種標記,表明這個 node 是以獨佔模式在同步佇列中等待
static final Node EXCLUSIVE = null;
/** waitStatus 常量值 */
//說明當前節點被取消,原因有可能是超時,或者被中斷。
//節點被取消的狀態是不可逆的,也就是說此節點會一直停留在取消狀態,不會轉變。
static final int CANCELLED = 1;
//說明後繼節點的執行緒被 park 阻塞,因此當前執行緒需要在釋放鎖或者被取消時,喚醒後繼節點
static final int SIGNAL = -1;
//說明執行緒在 condition 條件佇列等待
static final int CONDITION = -2;
//在共享模式中用,表明下一個共享執行緒應該無條件傳播
static final int PROPAGATE = -3;
//當前執行緒的等待狀態,除了以上四種值,還有一個值 0 為初始化狀態(條件佇列的節點除外)。
//注意這個值修改時是通過 CAS ,以保證執行緒安全。
volatile int waitStatus;
//前驅節點
volatile Node prev;
//後繼節點
volatile Node next;
//當前節點中的執行緒,通過建構函式初始化,出隊時會置空(這個後續說,重點強調)
volatile Thread thread;
//有兩種情況。1.在 condition 條件佇列中的後一個節點
//2. 一個特殊值 SHARED 用於表明當前是共享模式(因為條件佇列只存在於獨佔模式)
Node nextWaiter;
//是否是共享模式,理由同上
final boolean isShared() {
return nextWaiter == SHARED;
}
//返回前驅節點,如果為空丟擲空指標
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
```
另外,在 AQS 類中,還會記錄同步佇列的頭結點和尾結點:
```
//同步佇列的頭結點,是懶載入的,即不會立即建立一個同步佇列,
//只有當某個執行緒獲取不到鎖,需要排隊的時候,才會初始化頭結點
private transient volatile Node head;
//同步佇列的尾結點,同樣是懶載入。
private transient volatile Node tail;
```
## 獨佔鎖
這部分就結合 ReentrantLock 原始碼分析 AQS 的獨佔鎖是怎樣獲得和釋放鎖的。
### 非公平鎖
首先,我們從 ReentrantLock 開始分析,它有兩個構造方法,一個構造,可以傳入一個 boolean 型別的引數,表明是用公平鎖還是非公平鎖模式。另一個構造方法,不傳入任何引數,則預設用非公平鎖。
```
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
```
NonfairSync 和 FairSync 都繼承自 Sync ,它們都是 ReentranLock 的內部類。 而Sync 類又繼承自 AQS (AbstractQueuedSynchronizer)。
```
static final class NonfairSync extends Sync {
}
static final class FairSync extends Sync {
}
abstract static class Sync extends AbstractQueuedSynchronizer {
}
```
知道了它們之間的繼承關係,我們就從非公平鎖的加鎖方法作為入口,跟蹤原始碼。因為非公平鎖的流程講明白之後,公平鎖大致流程都一樣,只是多了一個條件判斷(這個,一會兒後邊細講,會做對比)。
**NonfairSync.lock**
我們看下公平鎖的獲取鎖的方法:
```
final void lock() {
//通過 CAS 操作把 state 設定為 1
if (compareAndSetState(0, 1))
//如果設值成功,說明加鎖成功,儲存當前獲得鎖的執行緒
setExclusiveOwnerThread(Thread.currentThread());
else
//如果加鎖失敗,則執行 AQS 的acquire 方法
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
```
**acquire**
這個方法的邏輯是:
1. 通過 tryAcquire 方法,嘗試獲取鎖,如果成功,則返回 true,失敗返回 false 。
2. tryAcquire 失敗之後,會先呼叫 addWaiter 方法,把當前執行緒封裝成 node 節點,加入同步佇列(獨佔模式)。
3. acquireQueued 方法會把剛加入佇列的 node 作為引數,通過自旋去獲得鎖。
**tryAcquire**
這是一個模板方法,具體的實現需要看它的子類,這裡對應的就是 ReentrantLock.NonfairSync.tryAcquire 方法。我們看一下:
```
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//當前執行緒
final Thread current = Thread.currentThread();
//獲取當前的同步狀態,若為 0 ,表示無鎖狀態。若大於 0,表示已經有執行緒搶到了鎖。
int c = getState();
if (c == 0) {
//然後通過 CAS 操作把 state 的值改為 1。
if (compareAndSetState(0, acquires)) {
// CAS 成功之後,儲存當前獲得鎖的執行緒
setExclusiveOwnerThread(current);
return true;
}
}
// 如果 state 大於0,則判斷當前執行緒是否是獲得鎖的執行緒,是的話,可重入。
else if (current == getExclusiveOwnerThread()) {
//由於 ReentrantLock 是可重入的,所以每重入一次 state 就加 1 。
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
```
**addWaiter**
如果獲取鎖失敗之後,就會呼叫 addWaiter 方法,把當前執行緒加入同步佇列。
```
private Node addWaiter(Node mode) {
//把當前執行緒封裝成 Node ,並且是獨佔模式
Node node = new Node(Thread.currentThread(), mode);
//嘗試快速入隊,如果失敗,則會呼叫 enq 入隊方法。enq 會初始化佇列。
Node pred = tail;
//如果 tail 不為空,說明當前佇列中已經有節點
if (pred != null) {
//把當前 node 的 prev 指標指向 tail
node.prev = pred;
//通過 CAS 把 node 設定為 tail,即新增到隊尾
if (compareAndSetTail(pred, node)) {
//把舊的 tail 節點的 next 指標指向當前 node
pred.next = node;
return node;
}
}
//當 tail 為空時,把 node 新增到佇列,如果需要的話,先進行佇列初始化
enq(node);
//入隊成功之後,返回當前 node
return node;
}
```
**enq**
通過自旋,把當前節點加入到佇列中
```
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果 tail為空,說明佇列未初始化
if (t == null) {
//建立一個空節點,通過 CAS把它設定為頭結點
if (compareAndSetHead(new Node()))
//此時只有一個 head頭節點,因此把 tail也指向它
tail = head;
} else {
//第二次自旋時,tail不為空,於是把當前節點的 prev指向 tail節點
node.prev = t;
//通過 CAS把 tail節點設定為當前 node節點
if (compareAndSetTail(t, node)) {
//把舊的 tail節點的 next指向當前 node
t.next = node;
return t;
}
}
}
}
```
**acquireQueued**
入隊成功之後,就會呼叫 acquireQueued 方法自旋搶鎖。
```
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取當前節點的前驅節點
final Node p = node.predecessor();
//如果前驅節點就是 head 節點,就呼叫 tryAcquire 方法搶鎖
if (p == head && tryAcquire(arg)) {
//如果搶鎖成功,就把當前 node 設定為頭結點
setHead(node);
p.next = null; // help GC
failed = false;
//搶鎖成功後,會把執行緒中斷標誌返回出去,終止for迴圈
return interrupted;
}
//如果搶鎖失敗,就根據前驅節點的 waitStatus 狀態判斷是否需要把當前執行緒掛起
if (shouldParkAfterFailedAcquire(p, node) &&
//執行緒被掛起時,判斷是否被中斷過
parkAndCheckInterrupt())
//注意此處,如果被執行緒被中斷過,需要把中斷標誌重新設定一下
interrupted = true;
}
} finally {
if (failed)
//如果丟擲異常,則取消鎖的獲取,進行出隊操作
cancelAcquire(node);
}
}
```
**setHead**
通過程式碼,我們可以看到,當前的同步佇列中,只有第二個節點才有資格搶鎖。如果搶鎖成功,則會把它設定為頭結點。
```
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
```
需要注意的是,這個方法,會把頭結點的執行緒設定為 null 。想一下,為什麼?
因為,此時頭結點的執行緒已經搶鎖成功,需要出隊了。自然的,佇列中也就不應該存在這個執行緒了。
PS:由 enq 方法,還有 setHead 方法,我們可以發現,頭結點的執行緒總是為 null。這是因為,頭結點要麼是剛初始化的空節點,要麼是搶到鎖的執行緒出隊了。因此,我們也常常把頭結點叫做虛擬節點(不儲存任何執行緒)。
**shouldParkAfterFailedAcquire**
以上是搶鎖成功的情況,那麼搶鎖失敗了呢?這時,我們需要判斷是否應該把當前執行緒掛起。
```
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取當前節點的前驅節點的 waitStatus
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果 ws = -1 ,說明當前執行緒可以被前驅節點正常喚醒,於是就可以安全的 park了
return true;
if (ws > 0) {
//如果 ws > 0,說明前驅節點被取消,則會從當前節點依次向前查詢,
//直到找到第一個沒有被取消的節點,把那個節點的 next 指向當前 node
//這一步,是為了找到一個可以把當前執行緒喚起的前驅節點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果 ws 為 0,或者 -3(共享鎖狀態),則把它設定為 -1
//返回 false,下次自旋時,就會判斷等於 -1,返回 true了
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
```
**parkAndCheckInterrupt**
如果 shouldParkAfterFailedAcquire 返回 true,說明當前執行緒需要被掛起。因此,就執行此方法,同時檢查執行緒是否被中斷。
```
private final boolean parkAndCheckInterrupt() {
//把當前執行緒掛起,則 acquireQueued 方法的自旋就會暫停,等待前驅節點 unpark
LockSupport.park(this);
//返回當前節點是否被中斷的標誌,注意此方法會把執行緒的中斷標誌清除。
//因此,返回上一層方法時,需要設定 interrupted = true 把中斷標誌重新設定,以便上層程式碼可以處理中斷
return Thread.interrupted();
}
```
想一下,為什麼搶鎖失敗後,需要判斷是否把執行緒掛起?
因為,如果搶不到鎖,並且還不把執行緒掛起,acquireQueued 方法就會一直自旋下去,這樣你的CPU能受得了嗎。
**cancelAcquire**
當不停的自旋搶鎖時,若發生了異常,就會呼叫此方法,取消正在嘗試獲取鎖的執行緒。node 的位置分為三種情況,見下面註釋,
```
private void cancelAcquire(Node node) {
if (node == null)
return;
// node 不再指向任何執行緒
node.thread = null;
Node pred = node.prev;
//從當前節點不斷的向前查詢,直到找到一個有效的前驅節點
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
//把 node 的 ws 設定為 -1
node.waitStatus = Node.CANCELLED;
// 1.如果 node 是 tail,則把 tail 更新為 node,並把 pred.next 指向 null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
//2.如果 node 既不是 tail,也不是 head 的後繼節點,就把 node的前驅節點的 ws 設定為 -1
//最後把 node 的前驅節點的 next 指向 node 的後繼節點
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//3.如果 node是 head 的後繼節點,則直接喚醒 node 的後繼節點。
//這個也很好理解,因為 node 是佇列中唯一有資格嘗試獲取鎖的節點,
//它放棄了資格,當然有義務把後繼節點喚醒,以讓後繼節點嘗試搶鎖。
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
```
**unparkSuccessor**
這個喚醒方法就比較簡單了,
```
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//從尾結點向前依次遍歷,直到找到距離當前 node 最近的一個有效節點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//把這個有效節點的執行緒喚醒,
//喚醒之後,當前執行緒就可以繼續自旋搶鎖了,(回到 park 的地方)
LockSupport.unpark(s.thread);
}
```
下面畫一個流程圖更直觀的檢視整個獲取鎖的過程。
![](https://img2020.cnblogs.com/other/1714084/202003/1714084-20200313214616066-1438351358.jpg)
### 公平鎖
公平鎖和非公平鎖的整體流程大致相同,只是在搶鎖之前先判斷一下是否已經有人排在前面,如果有的話,就不執行搶鎖。我們通過原始碼追蹤到 FairSync.tryAcquire 方法。會發現,多了一個 hasQueuedPredecessors 方法。
**hasQueuedPredecessors**
這個方法判斷邏輯稍微有點複雜,有多種情況。
```
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
```
1. 如果 h == t,說明 h 和 t 都為空(此時佇列還未初始化)或者它們是同一個節點(說明佇列已經初始化,並且只有一個節點,此時為 enq 方法第一次自旋成功後)。此時,返回false。
2. 如果 h != t,則判斷 head.next == null 是否成立,如果成立,則返回 true。這種情況發生在有其他執行緒第一次入隊時。在 AQS 的 enq 入隊方法,設定頭結點成功之後 compareAndSetHead(new Node()) ,還未執行 tail = head 時(仔細想一想為什麼?)。此時 tail = null , head = new Node(),head.next = null。
3. 如果 h != t,並且 head.next != null,說明此時佇列中至少已經有兩個節點,則判斷 head.next 是否是當前執行緒。如果是,返回 false(注意是 false哦,因為用了 !),否則返回 true 。
總結:以上幾種情況,只有最終返回 false 時,才會繼續往下執行。因為 false,說明沒有執行緒排在當前執行緒前面,於是通過 CAS 嘗試把 state 值設定為 1。若成功,則方法返回。若失敗,同樣需要去排隊。
### 公平鎖和非公平鎖區別
舉個例子來對比公平鎖和非公平鎖。比如,現在到飯點了,大家都到食堂打飯。把佇列中的節點比作排隊打飯的人,每個打飯視窗都有一個管理員,只有排隊的人從管理員手中搶到鎖,才有資格打飯。打飯的過程就是執行緒執行的過程。
![](https://img2020.cnblogs.com/other/1714084/202003/1714084-20200313214616329-1001586117.jpg)
如果,你發現前面沒有人在排隊,那麼就可以直接從管理員手中拿到鎖,然後打飯。對於公平鎖來說,如果你前面有人在打飯,那麼你就要排隊到他後面(圖中B),等他打完之後,把鎖還給管理員。那麼,你就可以從管理員手中拿到鎖,然後打飯了。後面的人依次排隊。這就是FIFO先進先出的佇列模型。
對於非公平鎖來說,如果你是圖中的 B,當 A 把鎖還給管理員後,有可能有另外一個 D 插隊過來直接把鎖搶走。那麼,他就可以打飯,你只能繼續等待了。
所以,可以看出來。公平鎖是嚴格按照排隊的順序來的,先來後到嘛,你來的早,就可以早點獲取鎖。優點是,這樣不會造成某個執行緒等待時間過長,因為大家都是中規中矩的在排隊。而缺點呢,就是會頻繁的喚起執行緒,增加 CPU的開銷。
非公平鎖的優點是吞吐量大,因為有可能正好鎖可用,然後執行緒來了,直接搶到鎖了,不用排隊了,這樣也減少了 CPU 喚醒排隊執行緒的開銷。 但是,缺點也很明顯,你說我排隊排了好長時間了,終於輪到我打飯了,憑什麼其他人剛過來就插到我前面,比我還先打到飯,也太不公平了吧,後邊一大堆排隊的人更是怨聲載道。這要是每個人來了都插到我前面去,我豈不是要餓死了。
### 獨佔鎖的釋放
我們從 ReentrantLock 的 unlock 方法看起:
```
public void unlock() {
//呼叫 AQS 的 release 方法
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//如果頭結點不為空,並且 ws 不為 0,則喚起後繼節點
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
```
這段邏輯比較簡單,當執行緒釋放鎖之後,就會喚醒後繼節點。 unparkSuccessor 已講,不再贅述。然後看下 **tryRelease** 方法,公平鎖和非公平鎖走的是同一個方法。
```
protected final boolean tryRelease(int releases) {
//每釋放一次鎖,state 值就會減 1,因為之前可能有鎖的重入
int c = getState() - releases;
//如果當前執行緒不是搶到鎖的執行緒,則丟擲異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//只有 state 的值減到 0 的時候,才會全部釋放鎖
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
```
因為,ReentrantLock 支援鎖的重入,所以每次重入 state 值都會加 1,相應的每次釋放鎖, state 的值也會減 1 。所以,這也是為什麼每個 lock 方法最後都要有一個 unlock 方法釋放鎖,它們的個數需要保證相同。
當 state 值為 0 的時候,說明鎖完全釋放。其他執行緒才可以有機會搶到鎖。
## 結語
以上已經講解了獨佔鎖主要的獲取方法 acquire ,另外還有一些其他相關方法,不再贅述,因為主要邏輯都是一樣的,只有部分稍有不同,只要理解了 acquire ,這些都是相通的。如 acquireInterruptibly 方法,它可以在獲取鎖的時候響應中斷。還有超時獲取鎖的方法 doAcquireNanos 可以設定獲取鎖的超時時間,超時之後就返回失敗。
下篇預告:分析 ReentrantReadWriteLock 讀寫鎖原始碼,以及 AQS 共享鎖的獲取和釋放,敬請期待。
如果本文對你有用,歡迎點贊,評論,轉發。
學習是枯燥的,也是有趣的。我是「煙雨星空」,歡迎關注,可第一時間接收文章