Java讀原始碼之ReentrantLock
阿新 • • 發佈:2020-04-07
## 前言
ReentrantLock 可重入鎖,應該是除了 synchronized 關鍵字外用的最多的執行緒同步手段了,雖然JVM維護者瘋狂優化 synchronized 使其已經擁有了很好的效能。但 ReentrantLock 仍有其存在價值,例如可以感知執行緒中斷,公平鎖模式,可以指定超時時間的搶鎖等更細粒度的控制都是目前的 synchronized 做不到的。
如果不是很瞭解 Java 中執行緒的一些基本概念,可以看之前這篇:
[Java讀原始碼之Thread](https://www.cnblogs.com/freshchen/p/11674575.html)
## 案例
用一個最簡單的案例引出我們的主角
```java
public class ReentrantLockDemo {
// 預設是非公平鎖和 synchronized 一樣
private static ReentrantLock reentrantLock = new ReentrantLock();
public void printThreadInfo(int num) {
reentrantLock.lock();
try {
System.out.println(num + " : " + Thread.currentThread().getName());
System.out.println(num + " : " + Thread.currentThread().toString());
} finally {
reentrantLock.unlock();
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
IntStream.rangeClosed(0, 3)
.forEach(num -> executorService
.execute(() -> new ReentrantLockDemo().printThreadInfo(num))
);
}
/**
* 輸出:
* 0 : pool-1-thread-1
* 0 : Thread[pool-1-thread-1,5,main]
* 3 : pool-1-thread-4
* 3 : Thread[pool-1-thread-4,5,main]
* 1 : pool-1-thread-2
* 1 : Thread[pool-1-thread-2,5,main]
* 2 : pool-1-thread-3
* 2 : Thread[pool-1-thread-3,5,main]
*/
```
可以看到使用起來也很簡單,而且達到了同步的效果。廢話不多說一起來瞅一瞅 lock() 和 unlock() 兩個同步方法是怎麼實現的。
## 原始碼分析
### 公平鎖與非公平鎖
公平鎖顧名思義。就是每個執行緒排隊搶佔鎖資源。而非公平鎖執行緒什麼時候能執行更多的看緣分,例如一個執行緒需要執行臨界區程式碼,不管之前有多少執行緒在等,直接去搶鎖,說白了就是插隊。對於 ReentrantLock 的實現,從構造器看出,當我們傳入 true 代表選擇了公平鎖模式
```java
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
```
為什麼先看公平鎖實現,而不是預設的非公平鎖,因為 synchronized 就是非公平鎖,1.7開始 synchronized 的實現改變了,並且基本借鑑了 ReentrantLock 的實現,加入了自旋,偏向鎖減少系統呼叫,所以如果需要非公平鎖且不需要特別精細的控制,完全沒有必要因為效能選擇 ReentrantLock 了。
### AQS 結構
從案例中的 lock 方法進入
- **ReentrantLock.FairSync#lock**
```java
final void lock() {
// 要一把鎖,向誰要鎖?
acquire(1);
}
```
在繼續深入之前讓我們先熟悉一下 AbstractQueuedSynchronizer(AKA :AQS) 的結構
- 首先繼承了 AbstractOwnableSynchronizer ,主要屬性:
```java
// 儲存當前持有鎖的執行緒
private transient Thread exclusiveOwnerThread;
```
- AQS 自身主要屬性:
```java
// 阻塞佇列的頭
private transient volatile Node head;
// 阻塞佇列的尾
private transient volatile Node tail;
// 同步器的狀態
private volatile int state;
```
從 head 和 tail 可以猜想到,AQS 應該是用一個連結串列作為等待佇列,給等待的執行緒排隊, status 欄位預設是0,一旦鎖被某個執行緒佔有就 +1,那為啥要用int呢? 如果當前持有鎖的這個執行緒(exclusiveOwnerThread)還要再來把鎖,那狀態還可以繼續 +1,也就實現了可重入。
- 上面的 Node 節點長啥樣呢,不要被註釋中 CLH 鎖高大上的名稱嚇到,其實就是雙向連結串列,主要屬性:
```java
// 標識次節點是共享模式
static final Node SHARED = new Node();
// 標識次節點是獨佔模式
static final Node EXCLUSIVE = null;
// 節點裡裝著排隊的執行緒
volatile Thread thread;
// 節點裡裝的執行緒放棄了,不搶鎖了,可能超時了,可能中斷了
static final int CANCELLED = 1;
// 下一個節點裡的執行緒等待被通知出隊
static final int SIGNAL = -1;
// 節點裡裝的執行緒在等待執行條件,結合 Condition 使用
static final int CONDITION = -2;
// 節點狀態需要被傳播到下一個節點,主要用在共享模式
static final int PROPAGATE = -3;
// 標識節點的等待狀態,初始0,取值是上面的 -3 ~ 1
volatile int waitStatus;
// 前一個節點
volatile Node prev;
// 後一個節點
volatile Node next;
// 指向下一個等待條件 Condition
Node nextWaiter;
```
去掉一些普通情況不會涉及的屬性,如果有四個執行緒競爭,結構如下圖所示:
![](https://cdn.jsdelivr.net/gh/freshchen/resource/img/draw/aqs-1-1.png)
可以看到就是一個標準的頭節點為空的雙鏈表,為什麼頭節點是空?
### 公平鎖加鎖
- **AbstractQueuedSynchronizer#acquire**
```java
public final void acquire(int arg) {
// 如果嘗試拿鎖沒成功,那就進等待佇列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 檢測到執行緒被中斷了,因為重置了中斷訊號但沒做處理,再設定下中斷位,讓使用者去處理,中斷標準操作
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
```
- **ReentrantLock.FairSync#tryAcquire**
```java
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 取AQS的 state 值
int c = getState();
// 當前沒有執行緒持有鎖
if (c == 0) {
// 如果沒有其他執行緒在排隊(公平)
if (!hasQueuedPredecessors() &&
// 這裡可能存在競爭 CAS 試著去搶一次鎖
compareAndSetState(0, acquires)) {
// 搶到鎖了,把鎖持有者改成自己,其他執行緒往後稍稍
setExclusiveOwnerThread(current);
return true;
}
}
// 鎖已經被持有了,但如果鎖主人就是自己,那歡迎光臨(可重入)
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 因為其他執行緒進不來,這裡不存在競爭,直接改鎖狀態
setState(nextc);
return true;
}
return false;
}
```
- **AbstractQueuedSynchronizer#hasQueuedPredecessors**
```java
// 返回 false 代表不需要排隊,true 代表要排隊
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h == t 頭等於尾只可能是剛初始的狀態或者已經沒有節點等待了
// h.next == null ? 下面介紹進隊的過程中,如果其他執行緒與此同時 tryAcquire 成功了,會把之前的head.next置為空,說明被捷足先登了,差一點可惜
// 如果到最後一個判斷了,也就是佇列中至少有一個等待節點,直接看第一個等待節點是不是自己,如果不是自己就乖乖排隊去
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
```
- **AbstractQueuedSynchronizer#addWaiter**
tryAcquire如果沒有拿到鎖,就需要進等待隊列了,變成一個 Node 例項
```java
// 這裡 mode 為獨佔模式
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 如果尾節點不為空,說明等待佇列已經初始化過
if (pred != null) {
node.prev = pred;
// 嘗試把自己放到隊尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 進來這裡說明,等待佇列沒有被初始化過,或者嘗試失敗了
enq(node);
return node;
}
```
- **AbstractQueuedSynchronizer#enq**
```java
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果尾節點是空,說明佇列沒有初始化
if (t == null) {
// 初始化一個空節點(延遲載入),head ,tail都指向它
if (compareAndSetHead(new Node()))
tail = head;
}
// 一直嘗試把自己塞到隊尾(自旋)
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
```
- **AbstractQueuedSynchronizer#acquireQueued**
addWaiter方法已經把等待執行緒包裝成節點放到等待隊列了,為啥要返回中斷標識呢?主要是為了給一些需要處理中斷的方式複用,例如 **ReentrantLock#lockInterruptibly**,以及帶超時的鎖
```java
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 這邊邏輯開始繞起來了
for (;;) {
// 拿前一個節點
final Node p = node.predecessor();
// 前一個節點是head,說明自己排在第一個
if (p == head &&
// 在讓出cpu前再試一次,此時可能鎖持有者已經讓位了
tryAcquire(arg)) {
// 搶到鎖了
setHead(node);
// 把之前沒用的頭節點釋放
p.next = null; // help GC
failed = false;
return interrupted;
}
// 兩次嘗試都失敗了,只能準備被掛起,讓出cpu了(調了核心,重量級)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 普通的鎖不處理中斷異常,不會進這個方法
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
// 把頭節點設為自己
head = node;
// 因為已經搶到鎖了,不需要記錄這個執行緒在等待了,保持了頭節點中執行緒永遠為 null
node.thread = null;
node.prev = null;
}
```
- **AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire**
```java
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 已經告訴前一個節點自己需要被通知了
return true;
if (ws > 0) {
// 只有 CANCELLED 這個狀態大於0,如果前面的節點不排隊了,就一直找到一個沒 CANCELLED 的
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
// 進到這裡,只剩下PROPAGATE(共享鎖時候才有) CONDITION(本文不涉及) 和 未賦值狀態也就是0,
else {
// 這裡把 預設狀態0 置為 -1,也就代表著後面有執行緒在等著被喚醒了
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 返回false,就暫時不會讓執行緒掛起,繼續自旋,直到返回true
return false;
}
```
- **AbstractQueuedSynchronizer#parkAndCheckInterrupt**
```java
private final boolean parkAndCheckInterrupt() {
// 掛起,標準用法this充當blocker
LockSupport.park(this);
// 一旦恢復,返回執行緒在掛起階段是否被中斷,此方法會重置中斷位
return Thread.interrupted();
}
```
到這裡加鎖流程就介紹差不多了,用一個最簡單流程的圖來總結一下:
![](https://cdn.jsdelivr.net/gh/freshchen/resource/img/draw/aqs-1-2.png)
### 公平鎖解鎖
- **AbstractQueuedSynchronizer#release**
```java
public final boolean release(int arg) {
// 嘗試釋放鎖
if (tryRelease(arg)) {
Node h = head;
// 如果等待佇列已經被初始化過,並且後面有節點等待操作
if (h != null && h.waitStatus != 0)
// 恢復掛起的執行緒
unparkSuccessor(h);
return true;
}
return false;
}
```
- **ReentrantLock.FairSync#tryRelease**
```java
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 能執行釋放鎖的肯定是鎖的持有者,除非虛擬機器魔怔了
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 考慮可重入
if (c == 0) {
free = true;
// 鎖現在沒有持有者了
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
```
- **ReentrantLock.FairSync#unparkSuccessor**
```java
// node 是頭節點
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 如果狀態不是 CANCELED,就把狀態置為初始狀態
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// s == null 這個條件成立主要是在共享模式下自旋釋放。
if (s == null || s.waitStatus > 0) {
// 把 CANCELED 狀態的節點置為空
s = null;
// 因為 head 這條路已經斷了,從尾巴開始找到第一個排隊的節點,然後把佇列接上
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 把第一個排隊的節點中的執行緒喚醒,
LockSupport.unpark(s.thread);
}
```
執行緒從加鎖程式碼裡介紹的 **AbstractQueuedSynchronizer#parkAndCheckInterrupt** 方法中醒來,繼續自旋拿鎖。如果此時後面還有人排隊就一定能拿到鎖了。如圖所示:
![](https://cdn.jsdelivr.net/gh/freshchen/resource/img/draw/aqs-1-3.png)
### 非公平鎖加鎖
- **ReentrantLock.NonfairSync#lock**
```java
final void lock() {
// 不管三七二十一,直接搶鎖,如果運氣好,鎖正好被釋放了,就不排隊了
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 和上面介紹的公平鎖一樣,只是 tryAcquire 實現不一樣
acquire(1);
}
```
- **ReentrantLock.Sync#nonfairTryAcquire**
上面公平鎖我們已經知道,執行緒真正掛起前會嘗試兩次,由於不考慮別人有沒有入隊,實現非常簡單
```java
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果沒有執行緒持有鎖,直接搶鎖
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果是重入,狀態累加
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
```
### 非公平鎖解鎖
因為都是獨佔鎖模式,解鎖和公平鎖邏輯一樣。
## 總結
至此,總算看完了 ReentrantLock 常規的加鎖解鎖原始碼,好好體會下 AQS 的結構,還是能看懂的,且很有收穫,總之 Doug Lea 大神牛B。
本文還是挖了很多坑的:
- 帶超時的鎖是如何實現的?
- 檢測中斷的鎖是如何實現的?
- 各種 Condition 是如何實現的?
- 鎖的共享模式是如何實現的?
以後有時間再一探究