Java同步器之ReentrantReadWriteLock原始碼解析
一、簡介
讀寫鎖是一種特殊的鎖,它把對共享資源的訪問分為讀訪問和寫訪問,多個執行緒可以同時對共享資源進行讀訪問,但是同一時間只能有一個執行緒對共享資源進行寫訪問,使用讀寫鎖可以極大地提高併發量。
二、特性
讀寫鎖具有以下特性:
是否互斥 | 讀 | 寫 |
---|---|---|
讀 | 否 | 是 |
寫 | 是 | 是 |
可以看到,讀寫鎖除了讀讀不互斥,讀寫、寫讀、寫寫都是互斥的。
那麼,ReentrantReadWriteLock
是怎麼實現讀寫鎖的呢?
三、類結構
在看原始碼之前,我們還是先來看一下ReentrantReadWriteLock
這個類的主要結構。
ReentrantReadWriteLock
中的類分成三個部分:
-
ReentrantReadWriteLock
本身實現了ReadWriteLock
介面,這個介面只提供了兩個方法readLock()
和writeLock()
; - 同步器,包含一個繼承了
AQS
的Sync
內部類,以及其兩個子類FairSync
和NonfairSync
; -
ReadLock
和WriteLock
兩個內部類實現了Lock
介面,它們具有鎖的一些特性。
四、原始碼分析
4.1 屬性
// 讀鎖 private final ReentrantReadWriteLock.ReadLock readerLock; // 寫鎖 private final ReentrantReadWriteLock.WriteLock writerLock; // 同步器 final Sync sync;
維護了讀鎖、寫鎖和同步器。
4.2 構造方法
// 預設構造方法
public ReentrantReadWriteLock() {
this(false);
}
// 是否使用公平鎖的構造方法
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
它提供了兩個構造方法,預設構造方法使用的是非公平鎖模式,在構造方法中初始化了讀鎖和寫鎖。
4.3 獲取讀鎖和寫鎖的方法
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
屬性中的讀鎖和寫鎖是私有屬性,通過這兩個方法暴露出去。
下面我們主要分析讀鎖和寫鎖的加鎖、解鎖方法,且都是基於非公平模式的。
4.4 ReadLock.lock()
// ReentrantReadWriteLock.ReadLock.lock()
public void lock() {
sync.acquireShared(1);
}
// AbstractQueuedSynchronizer.acquireShared()
public final void acquireShared(int arg) {
// 嘗試獲取共享鎖(返回1表示成功,返回-1表示失敗)
if (tryAcquireShared(arg) < 0)
// 失敗了就可能要排隊
doAcquireShared(arg);
}
// ReentrantReadWriteLock.Sync.tryAcquireShared()
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 狀態變數的值
// 在讀寫鎖模式下,高16位儲存的是共享鎖(讀鎖)被獲取的次數,低16位儲存的是互斥鎖(寫鎖)被獲取的次數
int c = getState();
// 互斥鎖的次數
// 如果其它執行緒獲得了寫鎖,直接返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 讀鎖被獲取的次數
int r = sharedCount(c);
// 下面說明此時還沒有寫鎖,嘗試去更新state的值獲取讀鎖
// 讀者是否需要排隊(是否是公平模式)
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 獲取讀鎖成功
if (r == 0) {
// 如果之前還沒有執行緒獲取讀鎖
// 記錄第一個讀者為當前執行緒
firstReader = current;
// 第一個讀者重入的次數為1
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 如果有執行緒獲取了讀鎖且是當前執行緒是第一個讀者
// 則把其重入次數加1
firstReaderHoldCount++;
} else {
// 如果有執行緒獲取了讀鎖且當前執行緒不是第一個讀者
// 則從快取中獲取重入次數儲存器
HoldCounter rh = cachedHoldCounter;
// 如果快取不屬性當前執行緒
// 再從ThreadLocal中獲取
// readHolds本身是一個ThreadLocal,裡面儲存的是HoldCounter
if (rh == null || rh.tid != getThreadId(current))
// get()的時候會初始化rh
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 如果rh的次數為0,把它放到ThreadLocal中去
readHolds.set(rh);
// 重入的次數加1(初始次數為0)
rh.count++;
}
// 獲取讀鎖成功,返回1
return 1;
}
// 通過這個方法再去嘗試獲取讀鎖(如果之前其它執行緒獲取了寫鎖,一樣返回-1表示失敗)
return fullTryAcquireShared(current);
}
// AbstractQueuedSynchronizer.doAcquireShared()
private void doAcquireShared(int arg) {
// 進入AQS的佇列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 當前節點的前一個節點
final Node p = node.predecessor();
// 如果前一個節點是頭節點(說明是第一個排隊的節點)
if (p == head) {
// 再次嘗試獲取讀鎖
int r = tryAcquireShared(arg);
// 如果成功了
if (r >= 0) {
// 頭節點後移並傳播
// 傳播即喚醒後面連續的讀節點
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 沒獲取到讀鎖,阻塞並等待被喚醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// AbstractQueuedSynchronizer.setHeadAndPropagate()
private void setHeadAndPropagate(Node node, int propagate) {
// h為舊的頭節點
Node h = head;
// 設定當前節點為新頭節點
setHead(node);
// 如果舊的頭節點或新的頭節點為空或者其等待狀態小於0(表示狀態為SIGNAL/PROPAGATE)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 需要傳播
// 取下一個節點
Node s = node.next;
// 如果下一個節點為空,或者是需要獲取讀鎖的節點
if (s == null || s.isShared())
// 喚醒下一個節點
doReleaseShared();
}
}
// AbstractQueuedSynchronizer.doReleaseShared()
// 這個方法只會喚醒一個節點
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果頭節點狀態為SIGNAL,說明要喚醒下一個節點
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 喚醒下一個節點
unparkSuccessor(h);
}
else if (ws == 0 &&
// 把頭節點的狀態改為PROPAGATE成功才會跳到下面的if
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果喚醒後head沒變,則跳出迴圈
if (h == head) // loop if head changed
break;
}
}
看完ReentrantLock原始碼解析的分析再看這章的內容應該會比較簡單,中間一樣的方法我們這裡直接跳過了。
我們來看看大致的邏輯:
- 先嚐試獲取讀鎖;
- 如果成功了直接結束;
- 如果失敗了,進入
doAcquireShared()
方法; -
doAcquireShared()
方法中首先會生成一個新節點並進入AQS
佇列中; - 如果頭節點正好是當前節點的上一個節點,再次嘗試獲取鎖;
- 如果成功了,則設定頭節點為新節點,並傳播;
- 傳播即喚醒下一個讀節點(如果下一個節點是讀節點的話);
- 如果頭節點不是當前節點的上一個節點或者(5)失敗,則阻塞當前執行緒等待被喚醒;
- 喚醒之後繼續走(5)的邏輯;
在整個邏輯中是在哪裡連續喚醒讀節點的呢?
答案是在doAcquireShared()
方法中,在這裡一個節點A
獲取了讀鎖後,會喚醒下一個讀節點B
,這時候B
也會獲取讀鎖,然後B
繼續喚醒C
,依次往復,也就是說這裡的節點是一個喚醒一個這樣的形式,而不是一個節點獲取了讀鎖後一次性喚醒後面所有的讀節點。
4.5 ReadLock.unlock()
// java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock.unlock
public void unlock() {
sync.releaseShared(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.releaseShared
public final boolean releaseShared(int arg) {
// 如果嘗試釋放成功了,就喚醒下一個節點
if (tryReleaseShared(arg)) {
// 這個方法實際是喚醒下一個節點
doReleaseShared();
return true;
}
return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// 如果第一個讀者(讀執行緒)是當前執行緒
// 就把它重入的次數減1
// 如果減到0了就把第一個讀者置為空
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 如果第一個讀者不是當前執行緒
// 一樣地,把它重入的次數減1
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
// 共享鎖獲取的次數減1
// 如果減為0了說明完全釋放了,才返回true
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.doReleaseShared
// 行為跟方法名有點不符,實際是喚醒下一個節點
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果頭節點狀態為SIGNAL,說明要喚醒下一個節點
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 喚醒下一個節點
unparkSuccessor(h);
}
else if (ws == 0 &&
// 把頭節點的狀態改為PROPAGATE成功才會跳到下面的if
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果喚醒後head沒變,則跳出迴圈
if (h == head) // loop if head changed
break;
}
}
解鎖的大致流程如下:
- 將當前執行緒重入的次數減
1
; - 將共享鎖總共被獲取的次數減
1
; - 如果共享鎖獲取的次數減為
0
了,說明共享鎖完全釋放了,那就喚醒下一個節點;
如下圖,ABC
三個節點各獲取了一次共享鎖,三者釋放的順序分別為ACB
,那麼最後B
釋放共享鎖的時候tryReleaseShared()
才會返回true
,進而才會喚醒下一個節點D
。
4.6 WriteLock.lock()
// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.lock()
public void lock() {
sync.acquire(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
// 先嚐試獲取鎖
// 如果失敗,則會進入佇列中排隊,後面的邏輯跟ReentrantLock一模一樣了
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// 狀態變數state的值
int c = getState();
// 互斥鎖被獲取的次數
int w = exclusiveCount(c);
if (c != 0) {
// 如果c!=0且w==0,說明共享鎖被獲取的次數不為0
// 這句話整個的意思就是
// 如果共享鎖被獲取的次數不為0,或者被其它執行緒獲取了互斥鎖(寫鎖)
// 那麼就返回false,獲取寫鎖失敗
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 溢位檢測
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 到這裡說明當前執行緒已經獲取過寫鎖,這裡是重入了,直接把state加1即可
setState(c + acquires);
// 獲取寫鎖成功
return true;
}
// 如果c等於0,就嘗試更新state的值(非公平模式writerShouldBlock()返回false)
// 如果失敗了,說明獲取寫鎖失敗,返回false
// 如果成功了,說明獲取寫鎖成功,把自己設定為佔有者,並返回true
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
// 獲取寫鎖失敗了後面的邏輯跟ReentrantLock是一致的,進入佇列排隊,這裡就不列原始碼了
寫鎖獲取的過程大致如下:
- 嘗試獲取鎖;
- 如果有讀者佔有著讀鎖,嘗試獲取寫鎖失敗;
- 如果有其它執行緒佔有著寫鎖,嘗試獲取寫鎖失敗;
- 如果是當前執行緒佔有著寫鎖,嘗試獲取寫鎖成功,
state
值加1
; - 如果沒有執行緒佔有著鎖(state==0),當前執行緒嘗試更新
state
的值,成功了表示嘗試獲取鎖成功,否則失敗; - 嘗試獲取鎖失敗以後,進入佇列排隊,等待被喚醒;
- 後續邏輯跟
ReentrantLock
是一致;
4.7 WriteLock.unlock()
// java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock.unlock()
public void unlock() {
sync.release(1);
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer.release()
public final boolean release(int arg) {
// 如果嘗試釋放鎖成功(完全釋放鎖)
// 就嘗試喚醒下一個節點
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// java.util.concurrent.locks.ReentrantReadWriteLock.Sync.tryRelease()
protected final boolean tryRelease(int releases) {
// 如果寫鎖不是當前執行緒佔有著,丟擲異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 狀態變數的值減1
int nextc = getState() - releases;
// 是否完全釋放鎖
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
// 設定狀態變數的值
setState(nextc);
// 如果完全釋放了寫鎖,返回true
return free;
}
寫鎖釋放的過程大致為:
- 先嚐試釋放鎖,即狀態變數
state
的值減1
; - 如果減為
0
了,說明完全釋放了鎖; - 完全釋放了鎖才喚醒下一個等待的節點;
五、總結
-
ReentrantReadWriteLock
採用讀寫鎖的思想,能提高併發的吞吐量; - 讀鎖使用的是共享鎖,多個讀鎖可以一起獲取鎖,互相不會影響,即讀讀不互斥;
- 讀寫、寫讀和寫寫是會互斥的,前者佔有著鎖,後者需要進入
AQS
佇列中排隊; - 多個連續的讀執行緒是一個接著一個被喚醒的,而不是一次性喚醒所有讀執行緒;
- 只有多個讀鎖都完全釋放了才會喚醒下一個寫執行緒;
- 只有寫鎖完全釋放了才會喚醒下一個等待者,這個等待者有可能是讀執行緒,也可能是寫執行緒;
六、拓展
- 如果同一個執行緒先獲取讀鎖,再獲取寫鎖會怎樣?
分析上圖中的程式碼,在tryAcquire()
方法中,如果讀鎖被獲取的次數不為0(c != 0 && w == 0)
,返回false
,返回之後外層方法會讓當前執行緒阻塞。
可以通過下面的方法驗證:
readLock.lock();
writeLock.lock();
writeLock.unlock();
readLock.unlock();
執行程式後會發現程式碼停止在writeLock.lock();
,當然,你也可以打個斷點跟蹤進去看看。
- 如果同一個執行緒先獲取寫鎖,再獲取讀鎖會怎樣?
分析上面的程式碼,在tryAcquireShared()
方法中,第一個紅框處並不會返回,因為不滿足getExclusiveOwnerThread() != current
;第二個紅框處如果原子更新成功就說明獲取了讀鎖,然後就會執行第三個紅框處的程式碼把其重入次數更改為1
。
可以通過下面的方法驗證:
writeLock.lock();
readLock.lock();
readLock.unlock();
writeLock.unlock();
你可以打個斷點跟蹤一下看看。
- 死鎖了麼?
通過上面的兩個例子,我們可以感受到同一個執行緒先讀後寫和先寫後讀是完全不一樣的,為什麼不一樣呢?
先讀後寫,一個執行緒佔有讀鎖後,其它執行緒還是可以佔有讀鎖的,這時候如果在其它執行緒佔有讀鎖之前讓自己佔有了寫鎖,其它執行緒又不能佔有讀鎖了,這段程式會非常難實現,邏輯也很奇怪,所以,設計成只要一個執行緒佔有了讀鎖,其它執行緒包括它自己都不能再獲取寫鎖。
先寫後讀,一個執行緒佔有寫鎖後,其它執行緒是不能佔有任何鎖的,這時候,即使自己佔有一個讀鎖,對程式的邏輯也不會有任何影響,所以,一個執行緒佔有寫鎖後是可以再佔有讀鎖的,只是這個時候其它執行緒依然無法獲取讀鎖。
如果你仔細思考上面的邏輯,你會發現一個執行緒先佔有讀鎖後佔有寫鎖,會有一個很大的問題——鎖無法被釋放也無法被獲取了。這個執行緒先佔有了讀鎖,然後自己再佔有寫鎖的時候會阻塞,然後它就自己把自己搞死了,進而把其它執行緒也搞死了,它無法釋放鎖,其它執行緒也無法獲得鎖了。
這是死鎖嗎?似乎不是,死鎖的定義是執行緒A
佔有著執行緒B
需要的資源,執行緒B
佔有著執行緒A
需要的資源,兩個執行緒相互等待對方釋放資源,經典的死鎖例子如下:
Object a = new Object();
Object b = new Object();
new Thread(()->{
synchronized (a) {
LockSupport.parkNanos(1000000);
synchronized (b) {
}
}
}).start();
new Thread(()->{
synchronized (b) {
synchronized (a) {
}
}
}).start();
簡單的死鎖用jstack
是可以看到的:
"Thread-1":
at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$1(ReentrantReadWriteLockTest.java:40)
- waiting to lock <0x000000076baa9068> (a java.lang.Object)
- locked <0x000000076baa9078> (a java.lang.Object)
at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$2/1831932724.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
"Thread-0":
at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest.lambda$main$0(ReentrantReadWriteLockTest.java:32)
- waiting to lock <0x000000076baa9078> (a java.lang.Object)
- locked <0x000000076baa9068> (a java.lang.Object)
at com.coolcoding.code.synchronize.ReentrantReadWriteLockTest$$Lambda$1/1096979270.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
Found 1 deadlock.
- 如何使用ReentrantReadWriteLock實現一個高效安全的TreeMap?
class SafeTreeMap {
private final Map<String, Object> m = new TreeMap<String, Object>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Object get(String key) {
readLock.lock();
try {
return m.get(key);
} finally {
readLock.unlock();
}
}
public Object put(String key, Object value) {
writeLock.lock();
try {
return m.put(key, value);
} finally {
writeLock.unlock();
}
}
}