ReentrantLock實現原理深入探究
本文轉載自【五月的倉頡】的部落格 原文
本文知識點
- 前言
- 什麼是AbstractQueuedSynchronizer?
- ReentrantLock的實現原理
- lock()實現原理
- unlock()實現原理
- ReentrantLock其他方法的實現
- 小結
前言
網上寫ReentrantLock的使用、ReentrantLock和synchronized的區別的文章很多,研究ReentrantLock並且能講清楚ReentrantLock的原理的文章很少,本文就來研究一下ReentrantLock的實現原理。研究ReentrantLock的實現原理需要比較好的Java基礎以及閱讀程式碼的能力,有些朋友看不懂沒關係,可以以後看,相信你一定會有所收穫。
ReentrantLock是基於AQS實現的,這在下面會講到,AQS的基礎又是CAS,如果不是很熟悉CAS的朋友,可以看一下這篇文章
什麼是AbstractQueuedSynchronizer?
ReentrantLock實現的前提就是AbstractQueuedSynchronizer,簡稱AQS,是java.util.concurrent的核心,CountDownLatch、FutureTask、Semaphore、ReentrantLock等都有一個內部類是這個抽象類的子類。先用兩張表格介紹一下AQS。第一個講的是Node,由於AQS是基於FIFO佇列的實現,因此必然存在一個個節點,Node就是一個節點。
Node裡面的部分屬性
屬性 | 定義 |
---|---|
Node SHARED = new Node() | 表示Node處於共享模式 |
Node EXCLUSIVE = null | 表示Node處於獨佔模式 |
int CANCELLED = 1 | 因為超時或者中斷,Node被設定為取消狀態,被取消的Node不應該去競爭鎖,只能保持取消狀態不變,不能轉換為其他狀態,處於這種狀態的Node會被踢出佇列,被GC回收 |
int SIGNAL = -1 | 表示這個Node的繼任Node被阻塞了,到時需要通知它 |
int CONDITION = -2 | 表示這個Node在條件佇列中,因為等待某個條件而被阻塞 |
int PROPAGATE = -3 | 使用在共享模式頭Node有可能處於這種狀態, 表示鎖的下一次獲取可以無條件傳播 |
int waitStatus | 0,新Node會處於這種狀態 |
Node prev | 佇列中某個Node的前驅Node |
Node next | 佇列中某個Node的後繼Node |
Thread thread | 這個Node持有的執行緒,表示等待鎖的執行緒 |
Node nextWaiter | 表示下一個等待condition的Node |
看完了Node,下面再看一下AQS中有哪些變數和方法:
AQS裡面的部分屬性
屬性/方法 | 含義 |
---|---|
Thread exclusiveOwnerThread | 這個是AQS父類AbstractOwnableSynchronizer的屬性,表示獨佔模式同步器的當前擁有者 |
Node | 上面已經介紹過了,FIFO佇列的基本單位 |
Node head | FIFO佇列中的頭Node |
Node tail | FIFO佇列中的尾Node |
int state | 同步狀態,0表示未鎖 |
int getState() | 獲取同步狀態 |
setState(int newState) | 設定同步狀態 |
boolean compareAndSetState(int expect,int update) | 利用CAS進行State的設定 |
long spinForTimeoutThreshold = 1000L | 執行緒自旋等待的時間 |
Node enq(final Node node) | 插入一個Node到FIFO佇列中 |
Node addWaiter(Node mode) | 為當前執行緒和指定模式建立並擴充一個等待佇列 |
void setHead(Node node) | 設定佇列的頭Node |
void unparkSuccessor(Node node) | 如果存在的話,喚起Node持有的執行緒 |
void doReleaseShared() | 共享模式下做釋放鎖的動作 |
void cancelAcquire(Node node) | 取消正在進行的Node獲取鎖的嘗試 |
boolean shouldParkAfterFailedAcquire(Node pred,Node node) | 在嘗試獲取鎖失敗後是否應該禁用當前執行緒並等待 |
void selfInterrupt() | 中斷當前執行緒本身 |
boolean parkAndCheckInterrupt() | 禁用當前執行緒進入等待狀態並中斷執行緒本身 |
boolean acquireQueued(final Node node,int arg) | 佇列中的執行緒獲取鎖 |
tryAcquire(int arg) | 嘗試獲得鎖由AQS的子類實現它 |
tryRelease(int arg) | 嘗試釋放鎖由AQS的子類實現它 |
isHeldExclusively() | 是否獨自持有鎖 |
acquire(int arg) | 獲取鎖 |
release(int arg) | 釋放鎖 |
compareAndSetHead(Node update) | 利用CAS設定頭Node |
compareAndSetTail(Node expect,Node update) | 利用CAS設定尾Node |
compareAndSetWaitStatus(Node node,int expect,int update) | 利用CAS設定某個Node中的等待狀態 |
上面列出了AQS中最主要的一些方法和屬性。整個AQS是典型的模板模式的應用,設計得十分精巧,對於FIFO佇列的各種操作在AQS中已經實現了,AQS的子類一般只需要重寫tryAcquire(int arg)和tryRelease(int arg)兩個方法即可。
ReentrantLock實現原理(原始碼較多)
ReentrantLock中有一個抽象類Sync:
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}
複製程式碼
ReentrantLock根據傳入構造方法的布林型引數例項化出Sync的實現類FairSync和NonfairSync,分別表示公平的Sync和非公平的Sync。由於ReentrantLock我們用的比較多的是非公平鎖,所以看下非公平鎖是如何實現的。假設執行緒1呼叫了ReentrantLock的lock()方法,那麼執行緒1將會獨佔鎖,整個呼叫鏈十分簡單:
第一個獲取鎖的執行緒就做了兩件事情:- 1、設定AbstractQueuedSynchronizer的state為1
- 2、設定AbstractOwnableSynchronizer的thread為當前執行緒
這兩步做完之後就表示執行緒1獨佔了鎖。然後執行緒2也要嘗試獲取同一個鎖,線上程1沒有釋放鎖的情況下必然是行不通的,所以執行緒2就要阻塞。那麼,執行緒2如何被阻塞?看下執行緒2的方法呼叫鏈,這就比較複雜了:
呼叫鏈看到確實非常長,沒關係,結合程式碼分析一下,其實ReentrantLock沒有那麼複雜,我們一點點來扒程式碼:lock()的實現原理
final void lock() {
if (compareAndSetState(0,1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
複製程式碼
首先執行緒2嘗試利用CAS去判斷state是不是0,是0就設定為1,當然這一步操作肯定是失敗的,因為執行緒1已經將state設定成了1,所以第2行必定是false,因此執行緒2走第5行的acquire方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
複製程式碼
從字面上就很好理解這個if的意思,先走第一個判斷條件嘗試獲取一次鎖,如果獲取的結果為false即失敗,走第二個判斷條件新增FIFO等待佇列。所以先看一下tryAcquire方法做了什麼,這個方法最終呼叫到的是Sync的nonfairTryAcquire方法:
1.final boolean nonfairTryAcquire(int acquires) {
2. final Thread current = Thread.currentThread();
3. int c = getState();
4. if (c == 0) {
5. if (compareAndSetState(0,acquires)) {
6. setExclusiveOwnerThread(current);
7. return true;
8. }
9. }
10. else if (current == getExclusiveOwnerThread()) {
11. int nextc = c + acquires;
12. if (nextc < 0) // overflow
13. throw new Error("Maximum lock count exceeded");
14. setState(nextc);
15. return true;
16 }
17. return false;
18.}
複製程式碼
由於state是volatile的,所以state對執行緒2具有可見性,執行緒2拿到最新的state,再次判斷一下能否持有鎖(可能執行緒1同步程式碼執行得比較快,這會兒已經釋放了鎖),不可以就返回false。
注意一下第10~第16行,這段程式碼的作用是讓某個執行緒可以多次呼叫同一個ReentrantLock,每呼叫一次給state+1,由於某個執行緒已經持有了鎖,所以這裡不會有競爭,因此不需要利用CAS設定state(相當於一個偏向鎖)。從這段程式碼可以看到,nextc每次加1,當nextc<0的時候丟擲error,那麼同一個鎖最多能重入Integer.MAX_VALUE次,也就是2147483647。
然後就走到if的第二個判斷裡面了,先走AQS的addWaiter方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(),mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred,node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
複製程式碼
先建立一個當前執行緒的Node,模式為獨佔模式(因為傳入的mode是一個NULL),再判斷一下佇列上有沒有節點,沒有就建立一個佇列,因此走enq方法:
1.private Node enq(final Node node) {
2. for (;;) {
3. Node t = tail;
4. if (t == null) { // Must initialize
5. if (compareAndSetHead(new Node()))
6. tail = head;
7. } else {
8. node.prev = t;
9. if (compareAndSetTail(t,node)) {
10. t.next = node;
11. return t;
12. }
13. }
14. }
15.}
複製程式碼
這個方法其實畫一張圖應該比較好理解,形成一個佇列之後應該是這樣的:
每一步都用圖表示出來了,由於執行緒2所在的Node是第一個要等待的Node,因此FIFO佇列上肯定沒有內容,tail為null,走的就是第4行~第6行的程式碼邏輯。這裡用了CAS設定頭Node,當然有可能執行緒2設定頭Node的時候CPU切換了,執行緒3已經把頭Node設定好了形成了上圖所示的一個佇列,這時執行緒2再迴圈一次獲取tail,由於tail是volatile的,所以對執行緒2可見,執行緒2看見tail不為null,就走到了7行的else裡面去往尾Node後面新增自身。整個過程下來,形成了一個雙向佇列。最後走AQS的acquireQueued(node,1):1.final boolean acquireQueued(final Node node,int arg) {
2. boolean failed = true;
3. try {
4. boolean interrupted = false;
5. for (;;) {
6. final Node p = node.predecessor();
7. if (p == head && tryAcquire(arg)) {
8. setHead(node);
9. p.next = null; // help GC
10. failed = false;
11. return interrupted;
12. }
13. if (shouldParkAfterFailedAcquire(p,node) &&
14. parkAndCheckInterrupt())
15. interrupted = true;
16. }
17. } finally {
18. if (failed)
19. cancelAcquire(node);
20. }
21.}
複製程式碼
此時再做判斷,由於執行緒2是雙向佇列的真正的第一個Node(前面還有一個h),所以第5行~第11行再次判斷一下執行緒2能不能獲取鎖(可能這段時間內執行緒1已經執行完了把鎖釋放了,state從1變為了0),如果還是不行,先呼叫AQS的shouldParkAfterFailedAcquire(p,node)方法:
1.private static boolean shouldParkAfterFailedAcquire(Node pred,Node node) {
2. int ws = pred.waitStatus;
3. if (ws == Node.SIGNAL)
4. /*
5. * This node has already set status asking a release
6. * to signal it,so it can safely park.
7. */
9. return true;
10. if (ws > 0) {
11. /*
12. * Predecessor was cancelled. Skip over predecessors and
13. * indicate retry.
14. */
15. do {
16. node.prev = pred = pred.prev;
17. } while (pred.waitStatus > 0);
18. pred.next = node;
19. } else {
20. /*
21. * waitStatus must be 0 or PROPAGATE. Indicate that we
22. * need a signal,but don't park yet. Caller will need to
23. * retry to make sure it cannot acquire before parking.
24. */
25. compareAndSetWaitStatus(pred,ws,Node.SIGNAL);
26. }
27. return false;
28.}
複製程式碼
這個waitStatus是h的waitStatus,很明顯是0,所以此時把h的waitStatus設定為Noed.SIGNAL即-1並返回false。既然返回了false,上面的acquireQueued的13行if自然不成立,再走一次for迴圈,還是先嚐試獲取鎖,不成功,繼續走shouldParkAfterFailedAcquire,此時waitStatus為-1,走第3行的判斷,返回true。然後走acquireQueued的13行if的第二個判斷條件parkAndCheckInterrupt:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
複製程式碼
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t,blocker);
UNSAFE.park(false,0L);
setBlocker(t,null);
}
複製程式碼
最後一步,呼叫LockSupport(阻塞原語)的park方法阻塞住了當前的執行緒。至此,使用ReentrantLock讓執行緒1獨佔鎖、執行緒2進入FIFO佇列並阻塞的完整流程已經整理出來了。
lock()的操作明瞭之後,就要探究一下unlock()的時候程式碼又做了什麼了,接著看下一部分。
unlock的實現原理
就不畫流程圖了,直接看一下程式碼流程,比較簡單,呼叫ReentrantLock的unlock方法:
public void unlock() {
sync.release(1);
}
複製程式碼
走AQS的release:
1.public final boolean release(int arg) {
2. if (tryRelease(arg)) {
3. Node h = head;
4. if (h != null && h.waitStatus != 0)
5. unparkSuccessor(h);
6. return true;
7. }
8. return false;
9.}
複製程式碼
先呼叫Sync的tryRelease嘗試釋放鎖:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
複製程式碼
首先,只有當c==0的時候才會讓free=true,這和上面一個執行緒多次呼叫lock方法累加state是對應的,呼叫了多少次的lock()方法自然必須呼叫同樣次數的unlock()方法才行,這樣才把一個鎖給全部解開。
當一條執行緒對同一個ReentrantLock全部解鎖之後,AQS的state自然就是0了,AbstractOwnableSynchronizer的exclusiveOwnerThread將被設定為null,這樣就表示沒有執行緒佔有鎖,方法返回true。程式碼繼續往下走,上面的release方法的第4行,h不為null成立,h的waitStatus為-1,不等於0也成立,所以走第5行的unparkSuccessor方法:
1.private void unparkSuccessor(Node node) {
2. int ws = node.waitStatus;
3. if (ws < 0)
4. compareAndSetWaitStatus(node,0);
5. Node s = node.next;
6. if (s == null || s.waitStatus > 0) {
7. s = null;
8. for (Node t = tail; t != null && t != node; t = t.prev)
9. if (t.waitStatus <= 0)
10. s = t;
11. }
12. if (s != null)
13. LockSupport.unpark(s.thread);
14.}
複製程式碼
s即h的下一個Node,這個Node裡面的執行緒就是執行緒2,由於這個Node不等於null,所以走12行,執行緒2被unPark了,得以執行。有一個很重要的問題是:鎖被解了怎樣保證整個FIFO佇列減少一個Node呢?這是一個很巧妙的設計,又回到了AQS的acquireQueued方法了:
1.final boolean acquireQueued(final Node node,node) &&
14. parkAndCheckInterrupt())
15. interrupted = true;
16. }
17. } finally {
18. if (failed)
19. cancelAcquire(node);
20. }
21.}
複製程式碼
被阻塞的執行緒2是被阻塞在第14行,注意這裡並沒有return語句,也就是說,阻塞完成執行緒2依然會進行for迴圈。然後,阻塞完成了,執行緒2所在的Node的前驅Node是p,執行緒2嘗試tryAcquire,成功,然後執行緒2就成為了head節點了,把p的next設定為null,這樣原頭Node裡面的所有物件都不指向任何塊記憶體空間,h屬於棧記憶體的內容,方法結束被自動回收,這樣隨著方法的呼叫完畢,原頭Node也沒有任何的引用指向它了,這樣它就被GC自動回收了。此時,遇到一個return語句,acquireQueued方法結束,後面的Node也是一樣的原理。
這裡有一個細節,看一下setHead方法:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
複製程式碼
setHead方法裡面的前驅Node是Null,也沒有執行緒,那麼為什麼不用一個在等待的執行緒作為Head Node呢?
因為一個執行緒隨時有可能因為中斷而取消,而取消的話,Node自然就要被GC了,那GC前必然要把頭Node的後繼Node變為一個新的頭而且要應對多種情況,這樣就很麻煩。用一個沒有thread的Node作為頭,相當於起了一個引導作用,因為head沒有執行緒,自然也不會被取消。
再看一下上面unparkSuccessor的5行~10行,就是為了防止head的下一個node被取消的情況,這樣,就從尾到頭遍歷,找出離head最近的一個node,對這個node進行unPark操作。
ReentrantLock其他方法的實現
如果能理解ReentrantLock的實現方式,那麼你會發現ReentrantLock中其餘一些方法的實現還是很簡單的,從JDK API關於ReentrantLock方法的介紹這部分,舉幾個例子:
- 1、int getHoldCount()
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
複製程式碼
獲取ReentrantLock的lock()方法被呼叫了幾次,就是state的當前值
- 2、Thread getOwner()
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
複製程式碼
獲取當前佔有鎖的執行緒,就是AbstractOwnableSynchronizer中exclusiveOwnerThread的值
- 3、Collection getQueuedThreads()
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
複製程式碼
從尾到頭遍歷一下,新增進ArrayList中
- 4、int getQueuedLength()
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}
複製程式碼
從尾到頭遍歷一下,累加n。當然這個方法和上面那個方法可能是不準確的,因為遍歷的時候可能別的執行緒又往佇列尾部添加了Node。
其餘方法也都差不多,可以自己去看一下。
小結
本文從原始碼分析了ReentrantLock的非公平加鎖和釋放鎖的原理,通過原始碼我們得知ReentrantLock的基石是AQS,AQS的基石是Node,多執行緒競爭鎖時通過自旋方式等待資源,通過Unsafe提供的compareAndSwapObject方法達到多執行緒競爭鎖時只有一個執行緒可以獲取鎖的效果,從而實現了執行緒安全。
翻看了一下公平加鎖的原始碼,每次在獲取鎖時都去檢查FIFO佇列是否已經有等待執行緒,如果有就把自己追加到FIFO佇列的末尾,而非公平加鎖則是每次都先去嘗試獲取鎖,獲取不到再加入到FIFO佇列。
另外本文程式碼並非照搬原博主的程式碼,而是在jdk8的原始碼基礎上覆制過來的,與原博主貼的程式碼有稍許偏差,不過核心實現是一致的。