ReentrantReadWriteLock 實現原理
ReentrantReadWriteLock 簡介
重入鎖 ReentrantLock 是排他鎖,排他鎖在同一時刻僅有一個執行緒可以進行訪問,但是在大多數場景下,大部分時間都是提供讀服務,而寫服務佔有的時間較少。然而,讀服務不存在資料競爭問題,如果一個執行緒在讀時禁止其他執行緒讀勢必會導致效能降低。所以就提供了讀寫鎖。
讀寫鎖維護著一對鎖,一個讀鎖和一個寫鎖。通過分離讀鎖和寫鎖,使得併發性比一般的排他鎖有了較大的提升:
在同一時間,可以允許多個讀執行緒同時訪問。
但是,在寫執行緒訪問時,所有讀執行緒和寫執行緒都會被阻塞。
複製程式碼
讀寫鎖的主要特性:
公平性:支援公平性和非公平性。
重入性:支援重入。讀寫鎖最多支援 65535 個遞迴寫入鎖和 65535 個遞迴讀取鎖。
鎖降級:遵循獲取寫鎖,再獲取讀鎖,最後釋放寫鎖的次序,如此寫鎖能夠降級成為讀鎖。
複製程式碼
ReadWriteLock
java.util.concurrent.locks.ReadWriteLock ,讀寫鎖介面。定義方法如下:
Lock readLock();
Lock writeLock();
複製程式碼
一對方法,分別獲得讀鎖和寫鎖 Lock 物件。
ReentrantReadWriteLock
java.util.concurrent.locks.ReentrantReadWriteLock ,實現 ReadWriteLock 介面,可重入的讀寫鎖實現類。在它內部,維護了一對相關的鎖,一個用於只讀操作,另一個用於寫入操作。只要沒有 Writer 執行緒,讀取鎖可以由多個 Reader 執行緒同時保持。
也就說說,寫鎖是獨佔的,讀鎖是共享的。
ReentrantReadWriteLock 類的大體結構如下:
/** 內部類 讀鎖 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 內部類 寫鎖 */
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
/** 使用預設(非公平)的排序屬性建立一個新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock() {
this (false);
}
/** 使用給定的公平策略建立一個新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/** 返回用於寫入操作的鎖 */
@Override
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
/** 返回用於讀取操作的鎖 */
@Override
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* 省略其餘原始碼
*/
}
public static class WriteLock implements Lock,java.io.Serializable {
/**
* 省略其餘原始碼
*/
}
public static class ReadLock implements Lock,java.io.Serializable {
/**
* 省略其餘原始碼
*/
}
複製程式碼
ReentrantReadWriteLock 與 ReentrantLock一樣,其鎖主體也是 Sync,它的讀鎖、寫鎖都是通過 Sync 來實現的。所以 ReentrantReadWriteLock 實際上只有一個鎖,只是在獲取讀取鎖和寫入鎖的方式上不一樣。
它的讀寫鎖對應兩個類:ReadLock 和 WriteLock 。這兩個類都是 Lock 的子類實現。
在 ReentrantLock 中,使用 Sync ( 實際是 AQS )的 int 型別的 state 來表示同步狀態,表示鎖被一個執行緒重複獲取的次數。但是,讀寫鎖 ReentrantReadWriteLock 內部維護著一對讀寫鎖,如果要用一個變數維護多種狀態,需要採用“按位切割使用”的方式來維護這個變數,將其切分為兩部分:高16為表示讀,低16為表示寫。
分割之後,讀寫鎖是如何迅速確定讀鎖和寫鎖的狀態呢?通過位運算。假如當前同步狀態為S,那麼:
寫狀態,等於 S & 0x0000FFFF(將高 16 位全部抹去)
讀狀態,等於 S >>> 16 (無符號補 0 右移 16 位)。
複製程式碼
程式碼如下:
// Sync.java
static final int SHARED_SHIFT = 16; // 位數
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 每個鎖的最大重入次數,65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
複製程式碼
exclusiveCount(int c) 靜態方法,獲得持有寫狀態的鎖的次數。
sharedCount(int c) 靜態方法,獲得持有讀狀態的鎖的執行緒數量。不同於寫鎖,讀鎖可以同時被多個執行緒持有。而每個執行緒持有的讀鎖支援重入的特性,所以需要對每個執行緒持有的讀鎖的數量單獨計數,這就需要用到 HoldCounter 計數器。
構造方法
在上面的構造方法中,我們已經看到基於 fair 引數,建立 建立 FairSync 還是 NonfairSync 物件。
getThreadId
getThreadId(Thread thread) 靜態方法,獲得執行緒編號。程式碼如下:
/**
* Returns the thread id for the given thread. We must access
* this directly rather than via method Thread.getId() because
* getId() is not final,and has been known to be overridden in
* ways that do not preserve unique mappings.
*/
static final long getThreadId(Thread thread) {
return UNSAFE.getLongVolatile(thread,TID_OFFSET);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long TID_OFFSET;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
TID_OFFSET = UNSAFE.objectFieldOffset
(tk.getDeclaredField("tid"));
} catch (Exception e) {
throw new Error(e);
}
}
複製程式碼
按道理說,直接呼叫執行緒對應的 Thread#getId() 方法,程式碼如下:
private long tid;
public long getId() {
return tid;
}
複製程式碼
但是實際上,Thread 的這個方法是非 final 修飾的,也就是說,如果我們有實現 Thread 的子類,完全可以覆寫這個方法,所以可能導致無法獲得 tid 屬性。因此上面的方法,使用 Unsafe 直接獲得 tid 屬性。不愧是 JDK 的原始碼,細思極恐。
另外,JDK-6346938 也討論了 “java.lang.Thread.getId() should be final” 這個問題,目前已經被 JDK 認為是一個 BUG ,但是不造為什麼一直沒修復。
其他實現方法
public final boolean isFair() {
return sync instanceof FairSync;
}
public int getReadLockCount() {
return sync.getReadLockCount();
}
public boolean isWriteLocked() {
return sync.isWriteLocked();
}
public boolean isWriteLockedByCurrentThread() {
return sync.isHeldExclusively();
}
public int getReadHoldCount() {
return sync.getReadHoldCount();
}
protected Collection<Thread> getQueuedWriterThreads() {
return sync.getExclusiveQueuedThreads();
}
protected Collection<Thread> getQueuedReaderThreads() {
return sync.getSharedQueuedThreads();
}
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
public final int getQueueLength() {
return sync.getQueueLength();
}
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}
複製程式碼
讀鎖和寫鎖
在上文中,我們也提到,ReentrantReadWriteLock 的讀鎖和寫鎖,基於它內部的 Sync 實現,所以具體的實現方法,就是對內部的 Sync 的方法的呼叫。
ReadLock
ReadLock 是 ReentrantReadWriteLock 的內部靜態類,實現 java.util.concurrent.locks.Lock 介面,讀鎖實現類。
構造方法
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
複製程式碼
sync 欄位,通過 ReentrantReadWriteLock 的構造方法,傳入並使用它的 Sync 物件。
lock
@Override
public void lock() {
sync.acquireShared(1);
}
複製程式碼
呼叫 AQS 的 #acquireShared(int arg) 方法,共享式獲得同步狀態。所以,讀鎖可以同時被多個執行緒獲取。
lockInterruptibly
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
複製程式碼
tryLock
/**
* Acquires the read lock only if the write lock is not held by
* another thread at the time of invocation.
*
* <p>Acquires the read lock if the write lock is not held by
* another thread and returns immediately with the value
* {@code true}. Even when this lock has been set to use a
* fair ordering policy,a call to {@code tryLock()}
* <em>will</em> immediately acquire the read lock if it is
* available,whether or not other threads are currently
* waiting for the read lock. This "barging" behavior
* can be useful in certain circumstances,even though it
* breaks fairness. If you want to honor the fairness setting
* for this lock,then use {@link #tryLock(long,TimeUnit)
* tryLock(0,TimeUnit.SECONDS) } which is almost equivalent
* (it also detects interruption).
*
* <p>If the write lock is held by another thread then
* this method will return immediately with the value
* {@code false}.
*
* @return {@code true} if the read lock was acquired
*/
@Override
public boolean tryLock() {
return sync.tryReadLock();
}
複製程式碼
實際上,原因和ReentrantLock 的 tryLock 相同。
tryLock() 實現方法,在實現時,希望能快速的獲得是否能夠獲得到鎖,因此即使在設定為 fair = true ( 使用公平鎖 ),依然呼叫 Sync#tryReadLock() 方法。
如果真的希望 #tryLock() 還是按照是否公平鎖的方式來,可以呼叫 #tryLock(0,TimeUnit) 方法來實現。
tryLock
@Override
public boolean tryLock(long timeout,TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1,unit.toNanos(timeout));
}
複製程式碼
unlock
@Override
public void unlock() {
sync.releaseShared(1);
}
複製程式碼
呼叫 AQS 的 #releaseShared(int arg) 方法,共享式釋放同步狀態。
newCondition
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
複製程式碼
不支援 Condition 條件。
WriteLock
WriteLock 的程式碼,類似 ReadLock 的程式碼,差別在於獨佔式獲取同步狀態。
WriteLock 是 ReentrantReadWriteLock 的內部靜態類,實現 java.util.concurrent.locks.Lock 介面,寫鎖實現類。
構造方法
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
複製程式碼
sync 欄位,通過 ReentrantReadWriteLock 的構造方法,傳入並使用它的 Sync 物件。
lock
@Override
public void lock() {
sync.acquire(1);
}
複製程式碼
呼叫 AQS 的 #.acquire(int arg) 方法,獨佔式獲得同步狀態。所以,寫鎖只能同時被一個執行緒獲取。
lockInterruptibly
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
複製程式碼
tryLock
/**
* Acquires the write lock only if it is not held by another thread
* at the time of invocation.
*
* <p>Acquires the write lock if neither the read nor write lock
* are held by another thread
* and returns immediately with the value {@code true},* setting the write lock hold count to one. Even when this lock has
* been set to use a fair ordering policy,a call to
* {@code tryLock()} <em>will</em> immediately acquire the
* lock if it is available,whether or not other threads are
* currently waiting for the write lock. This "barging"
* behavior can be useful in certain circumstances,even
* though it breaks fairness. If you want to honor the
* fairness setting for this lock,then use {@link
* #tryLock(long,TimeUnit) tryLock(0,TimeUnit.SECONDS) }
* which is almost equivalent (it also detects interruption).
*
* <p>If the current thread already holds this lock then the
* hold count is incremented by one and the method returns
* {@code true}.
*
* <p>If the lock is held by another thread then this method
* will return immediately with the value {@code false}.
*
* @return {@code true} if the lock was free and was acquired
* by the current thread,or the write lock was already held
* by the current thread; and {@code false} otherwise.
*/
@Override
public boolean tryLock( ) {
return sync.tryWriteLock();
}
複製程式碼
實際上,和 ReentrantLock的 tryLoc 相同。
tryLock() 實現方法,在實現時,希望能快速的獲得是否能夠獲得到鎖,因此即使在設定為 fair = true ( 使用公平鎖 ),依然呼叫 Sync#tryWriteLock() 方法。 如果真的希望 #tryLock() 還是按照是否公平鎖的方式來,可以呼叫 #tryLock(0,TimeUnit) 方法來實現。
tryLock
@Override
public boolean tryLock(long timeout,TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(timeout));
}
複製程式碼
unlock
@Override
public void unlock() {
sync.release(1);
}
複製程式碼
呼叫 AQS 的 #release(int arg) 方法,獨佔式釋放同步狀態。
newCondition
@Override
public Condition newCondition() {
return sync.newCondition();
}
複製程式碼
呼叫 Sync#newCondition() 方法,建立 Condition 物件。
isHeldByCurrentThread
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
複製程式碼
呼叫 Sync#isHeldExclusively() 方法,判斷是否被當前執行緒獨佔鎖。
getHoldCount
public int getHoldCount() {
return sync.getWriteHoldCount();
}
複製程式碼
呼叫 Sync#getWriteHoldCount() 方法,返回當前執行緒獨佔鎖的持有數量。
Sync 抽象類
Sync 是 ReentrantReadWriteLock 的內部靜態類,實現 AbstractQueuedSynchronizer 抽象類,同步器抽象類。它使用 AQS 的 state 欄位,來表示當前鎖的持有數量,從而實現可重入和讀寫鎖的特性。
構造方法
private transient ThreadLocalHoldCounter readHolds; // 當前執行緒的讀鎖持有數量
private transient Thread firstReader = null; // 第一個獲取讀鎖的執行緒
private transient int firstReaderHoldCount; // 第一個獲取讀鎖的重入數
private transient HoldCounter cachedHoldCounter; // 最後一個獲得讀鎖的執行緒的 HoldCounter 的快取物件
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
複製程式碼
writerShouldBlock
abstract boolean writerShouldBlock();
獲取寫鎖時,如果有前序節點也獲得鎖時,是否阻塞。NonefairSync 和 FairSync 下有不同的實現。
readerShouldBlock
abstract boolean readerShouldBlock();
獲取讀鎖時,如果有前序節點也獲得鎖時,是否阻塞。NonefairSync 和 FairSync 下有不同的實現。
【寫鎖】tryAcquire
@Override
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
//當前鎖個數
int c = getState();
//寫鎖
int w = exclusiveCount(c);
if (c != 0) {
//c != 0 && w == 0 表示存在讀鎖
//當前執行緒不是已經獲取寫鎖的執行緒
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//超出最大範圍
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
// 是否需要阻塞
if (writerShouldBlock() ||
!compareAndSetState(c,c + acquires))
return false;
//設定獲取鎖的執行緒為當前執行緒
setExclusiveOwnerThread(current);
return true;
}
複製程式碼
該方法和 ReentrantLock 的 #tryAcquire(int arg) 大致一樣,差別在判斷重入時,增加了一項條件:讀鎖是否存在。因為要確保寫鎖的操作對讀鎖是可見的。如果在存在讀鎖的情況下允許獲取寫鎖,那麼那些已經獲取讀鎖的其他執行緒可能就無法感知當前寫執行緒的操作。因此只有等讀鎖完全釋放後,寫鎖才能夠被當前執行緒所獲取,一旦寫鎖獲取了,所有其他讀、寫執行緒均會被阻塞。
呼叫 #writerShouldBlock() 抽象方法,若返回 true ,則獲取寫鎖失敗。
【讀鎖】tryAcquireShared
tryAcqurireShared(int arg) 方法,嘗試獲取讀同步狀態,獲取成功返回 >= 0 的結果,否則返回 < 0 的結果。程式碼如下:
protected final int tryAcquireShared(int unused) {
//當前執行緒
Thread current = Thread.currentThread();
int c = getState();
//exclusiveCount(c)計算寫鎖
//如果存在寫鎖,且鎖的持有者不是當前執行緒,直接返回-1
//存在鎖降級問題,後續闡述
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//讀鎖
int r = sharedCount(c);
/*
* readerShouldBlock():讀鎖是否需要等待(公平鎖原則)
* r < MAX_COUNT:持有執行緒小於最大數(65535)
* compareAndSetState(c,c + SHARED_UNIT):設定讀取鎖狀態
*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c,c + SHARED_UNIT)) { //修改高16位的狀態,所以要加上2^16
/*
* holdCount部分後面講解
*/
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
複製程式碼
讀鎖獲取的過程相對於獨佔鎖而言會稍微複雜下,整個過程如下:
- 因為存在鎖降級情況,如果存在寫鎖且鎖的持有者不是當前執行緒,則直接返回失敗,否則繼續。
- 依據公平性原則,呼叫 readerShouldBlock() 方法來判斷讀鎖是否不需要阻塞,讀鎖持有執行緒數小於最大值(65535),且 CAS 設定鎖狀態成
fullTryAcquireShared
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
// 鎖降級
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
}
// 讀鎖需要阻塞,判斷是否當前執行緒已經獲取到讀鎖
else if (readerShouldBlock()) {
//列頭為當前執行緒
if (firstReader == current) {
}
//HoldCounter後面講解
else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0) // 計數為 0 ,說明沒得到讀鎖,清空執行緒變數
readHolds.remove();
}
}
if (rh.count == 0) // 說明沒得到讀鎖
return -1;
}
}
//讀鎖超出最大範圍
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//CAS設定讀鎖成功
if (compareAndSetState(c,c + SHARED_UNIT)) { //修改高16位的狀態,所以要加上2^16
//如果是第1次獲取“讀取鎖”,則更新firstReader和firstReaderHoldCount
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
}
//如果想要獲取鎖的執行緒(current)是第1個獲取鎖(firstReader)的執行緒,則將firstReaderHoldCount+1
else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
//更新執行緒的獲取“讀取鎖”的共享計數
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
複製程式碼
該方法會根據“是否需要阻塞等待”,“讀取鎖的共享計數是否超過限制”等等進行處理。如果不需要阻塞等待,並且鎖的共享計數沒有超過限制,則通過 CAS 嘗試獲取鎖,並返回 1 。所以,fullTryAcquireShared(Thread) 方法,是 #tryAcquireShared(int unused) 方法的自旋重試的邏輯。
【寫鎖】tryRelease
protected final boolean tryRelease(int releases) {
//釋放的執行緒不為鎖的持有者
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
//若寫鎖的新執行緒數為0,則將鎖的持有者設定為null
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
複製程式碼
寫鎖釋放鎖的整個過程,和獨佔鎖 ReentrantLock 相似,每次釋放均是減少寫狀態,當寫狀態為 0 時,表示寫鎖已經完全釋放了,從而讓等待的其他執行緒可以繼續訪問讀、寫鎖,獲取同步狀態。同時,此次寫執行緒的修改對後續的執行緒可見。
【讀鎖】tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//如果想要釋放鎖的執行緒為第一個獲取鎖的執行緒
if (firstReader == current) {
//僅獲取了一次,則需要將firstReader 設定null,否則 firstReaderHoldCount - 1
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
}
//獲取rh物件,並更新“當前執行緒獲取鎖的資訊”
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//CAS更新同步狀態
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c,nextc))
return nextc == 0;
}
}
複製程式碼
unmatchedUnlockException() 方法,返回 IllegalMonitorStateException 異常。程式碼如下:
private IllegalMonitorStateException unmatchedUnlockException() {
return new IllegalMonitorStateException(
"attempt to unlock read lock,not locked by current thread");
}
複製程式碼
出現的情況是,unlock 讀鎖的執行緒,非獲得讀鎖的執行緒。正常使用的情況,不會出現該情況。
tryWriteLock
tryWriteLock() 方法,嘗試獲取寫鎖。
- 若獲取成功,返回 true 。
- 若失敗,返回 false 即可,不進行等待排隊。
程式碼如下:
final boolean tryWriteLock(){
Thread current = Thread.currentThread();
int c = getState();
if(c != 0){
int w = exclusiveCount(c); // 獲得現在寫鎖獲取的數量
if(w == 0 || current != getExclusiveOwnerThread()){ // 判斷是否是其他的執行緒獲取了寫鎖。若是,返回 false
return false;
}
if(w == MAX_COUNT){ // 超過寫鎖上限,丟擲 Error 錯誤
throw new Error("Maximum lock count exceeded");
}
}
if(!compareAndSetState(c,c + 1)){ // CAS 設定同步狀態,嘗試獲取寫鎖。若失敗,返回 false
return false;
}
setExclusiveOwnerThread(current); // 設定持有寫鎖為當前執行緒
return true;
}
複製程式碼
tryReadLock
tryReadLock() 方法,嘗試獲取讀鎖。
若獲取成功,返回 true 。 若失敗,返回 false 即可,不進行等待排隊。 程式碼如下:
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
//exclusiveCount(c)計算寫鎖
//如果存在寫鎖,且鎖的持有者不是當前執行緒,直接返回-1
//存在鎖降級問題,後續闡述
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
// 讀鎖
int r = sharedCount(c);
/*
* HoldCount 部分後面講解
*/
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c,c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
複製程式碼
isHeldExclusively
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
複製程式碼
newCondition
final ConditionObject newCondition() {
return new ConditionObject();
}
複製程式碼
其他實現方法
final Thread getOwner() {
// Must read state before owner to ensure memory consistency
return ((exclusiveCount(getState()) == 0) ?
null :
getExclusiveOwnerThread());
}
final int getReadLockCount() {
return sharedCount(getState());
}
final boolean isWriteLocked() {
return exclusiveCount(getState()) != 0;
}
final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}
final int getReadHoldCount() {
if (getReadLockCount() == 0)
return 0;
Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == getThreadId(current))
return rh.count;
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}
/**
* Reconstitutes the instance from a stream (that is,deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException,ClassNotFoundException {
s.defaultReadObject();
readHolds = new ThreadLocalHoldCounter();
setState(0); // reset to unlocked state
}
final int getCount() { return getState();
}
複製程式碼
Sync 實現類
NonfairSync
NonfairSync 是 ReentrantReadWriteLock 的內部靜態類,實現 Sync 抽象類,非公平鎖實現類。程式碼如下:
static final class NonfairSync extends Sync {
@Override
final boolean writerShouldBlock() {
return false; // writers can always barge
}
@Override
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,* block if the thread that momentarily appears to be head
* of queue,if one exists,is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
複製程式碼
因為寫鎖是獨佔排它鎖,所以在非公平鎖的情況下,需要呼叫 AQS 的 apparentlyFirstQueuedIsExclusive() 方法,判斷是否當前寫鎖已經被獲取。程式碼如下:
final boolean apparentlyFirstQueuedIsExclusive() {
Node h,s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() && // 非共享,即獨佔
s.thread != null;
}
複製程式碼
FairSync
FairSync 是 ReentrantReadWriteLock 的內部靜態類,實現 Sync 抽象類,公平鎖實現類。程式碼如下:
static final class FairSync extends Sync {
@Override
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
@Override
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
複製程式碼
呼叫 AQS 的 #hasQueuedPredecessors() 方法,是否有前序節點,即自己不是首個等待獲取同步狀態的節點。
HoldCounter
在讀鎖獲取鎖和釋放鎖的過程中,我們一直都可以看到一個變數 rh (HoldCounter ),該變數在讀鎖中扮演著非常重要的作用。
我們瞭解讀鎖的內在機制其實就是一個共享鎖,為了更好理解 HoldCounter ,我們暫且認為它不是一個鎖的概率,而相當於一個計數器。一次共享鎖的操作就相當於在該計數器的操作。獲取共享鎖,則該計數器 + 1,釋放共享鎖,該計數器 - 1。只有當執行緒獲取共享鎖後才能對共享鎖進行釋放、重入操作。所以 HoldCounter 的作用就是當前執行緒持有共享鎖的數量,這個數量必須要與執行緒繫結在一起,否則操作其他執行緒鎖就會丟擲異常。
HoldCounter 是 Sync 的內部靜態類。
static final class HoldCounter {
int count = 0; // 計數器
final long tid = getThreadId(Thread.currentThread()); // 執行緒編號
}
複製程式碼
ThreadLocalHoldCounter 是 Sync 的內部靜態類。
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
@Override
public HoldCounter initialValue() {
return new HoldCounter();
}
}
複製程式碼
通過 ThreadLocalHoldCounter 類,HoldCounter 就可以與執行緒進行綁定了。故而,HoldCounter 應該就是繫結執行緒上的一個計數器,而 ThreadLocalHoldCounter 則是執行緒繫結的 ThreadLocal。
從上面我們可以看到 ThreadLocal 將 HoldCounter 繫結到當前執行緒上,同時 HoldCounter 也持有執行緒編號,這樣在釋放鎖的時候才能知道 ReadWriteLock 裡面快取的上一個讀取執行緒(cachedHoldCounter)是否是當前執行緒。這樣做的好處是可以減少ThreadLocal.get() 方法的次呼叫數,因為這也是一個耗時操作。
需要說明的是這樣 HoldCounter 繫結執行緒編號而不繫結執行緒物件的原因是,避免 HoldCounter 和 ThreadLocal 互相繫結而導致 GC 難以釋放它們(儘管 GC 能夠智慧的發現這種引用而回收它們,但是這需要一定的代價),所以其實這樣做只是為了幫助 GC 快速回收物件而已。
看到這裡我們明白了 HoldCounter 作用了,我們在看一個獲取讀鎖的程式碼段:
//如果獲取讀鎖的執行緒為第一次獲取讀鎖的執行緒,則firstReaderHoldCount重入數 + 1
else if (firstReader == current) {
firstReaderHoldCount++;
} else {
//非firstReader計數
if (rh == null)
rh = cachedHoldCounter;
//rh == null 或者 rh.tid != current.getId(),需要獲取rh
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
//加入到readHolds中
else if (rh.count == 0)
readHolds.set(rh);
//計數+1
rh.count++;
cachedHoldCounter = rh; // cache for release
}
複製程式碼
這裡解釋下為何要引入 firstReader、firstReaderHoldCount 變數。這是為了一個效率問題,firstReader 是不會放入到 readHolds 中的,如果讀鎖僅有一個的情況下,就會避免查詢 readHolds 。
鎖降級
讀寫鎖有一個特性就是鎖降級。鎖降級就意味著寫鎖是可以降級為讀鎖的,但是需要遵循先獲取寫鎖、獲取讀鎖再釋放寫鎖的次序。注意如果當前執行緒先獲取寫鎖,然後釋放寫鎖,再獲取讀鎖這個過程不能稱之為鎖降級,鎖降級一定要遵循那個次序。
在獲取讀鎖的方法 #tryAcquireShared(int unused) 中,有一段程式碼就是來判讀鎖降級的:
int c = getState();
//exclusiveCount(c)計算寫鎖
//如果存在寫鎖,且鎖的持有者不是當前執行緒,直接返回-1
//存在鎖降級問題,後續闡述
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//讀鎖
int r = sharedCount(c);
複製程式碼
鎖降級中讀鎖的獲取釋放為必要?肯定是必要的。試想,假如當前執行緒 A 不獲取讀鎖而是直接釋放了寫鎖,這個時候另外一個執行緒 B 獲取了寫鎖,那麼這個執行緒 B 對資料的修改是不會對當前執行緒 A 可見的。如果獲取了讀鎖,則執行緒B在獲取寫鎖過程中判斷如果有讀鎖還沒有釋放則會被阻塞,只有當前執行緒 A 釋放讀鎖後,執行緒 B 才會獲取寫鎖成功。