ReentrantReadWriteLock 原始碼分析
阿新 • • 發佈:2018-12-01
ReentrantReadWriteLock
1)獲取順序: 非公平模式(預設):連續競爭的非公平鎖可能無限期地推遲一個或多個 reader 或 writer 執行緒,但吞吐量通常要高於公平鎖。 公平模式:當某個執行緒釋放當前保持的鎖時,可以為等待時間最長的單個 writer 執行緒分配寫入鎖,如果有一組等待時間大於所有正在等待的 writer 執行緒的 reader 執行緒,則將為該組分配讀取鎖。 2)重入:此鎖允許 reader 和 writer 按照 ReentrantLock 的樣式重新獲取讀取鎖或寫入鎖。 3)鎖降級:重入還允許從寫入鎖降級為讀取鎖,其實現方式是:先獲取寫入鎖,然後獲取讀取鎖,最後釋放寫入鎖。但是,從讀取鎖升級到寫入鎖是不可能的。 4)鎖獲取的中斷:讀取鎖和寫入鎖都支援鎖獲取期間的中斷。 5)同步狀態值的高 16 位儲存讀取鎖被持有的次數,低 16 位儲存寫入鎖被持有的次數。
建立例項
/** 內部類實現的讀鎖 */ private final ReentrantReadWriteLock.ReadLock readerLock; /** 內部類實現的寫鎖 */ private final ReentrantReadWriteLock.WriteLock writerLock; /** 實現鎖操作的同步器 */ final Sync sync; /** * 建立一個非公平的讀寫鎖例項 */ public ReentrantReadWriteLock() { this(false); } /** * 1)fair=true,建立一個公平的讀寫鎖例項。 * 2)fair=false,建立一個非公平的讀寫鎖例項。 */ public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); // 基於當期例項建立讀鎖和寫鎖 readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
讀鎖獲取:ReadLock#lock
/** * 1)如果寫鎖沒有被其他執行緒持有,則成功獲取讀鎖並返回 * 2)寫鎖被其他執行緒持有,則進入阻塞狀態。 */ @Override public void lock() { sync.acquireShared(1); } AbstractQueuedSynchronizer#acquireShared /** * 在共享模式下獲取鎖,忽略執行緒中斷。 */ public final void acquireShared(int arg) { // 嘗試獲取共享鎖 if (tryAcquireShared(arg) < 0) { // 再次獲取共享鎖 doAcquireShared(arg); } } ReentrantReadWriteLock#Sync abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; /** * 高 16 位記錄寫鎖持有次數,低 16 位記錄讀鎖持有次數 */ static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = 1 << Sync.SHARED_SHIFT; static final int MAX_COUNT = (1 << Sync.SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << Sync.SHARED_SHIFT) - 1; /** 讀鎖被持有的計數值 */ static int sharedCount(int c) { return c >>> Sync.SHARED_SHIFT; } /** 寫鎖被持有的計數值 */ static int exclusiveCount(int c) { return c & Sync.EXCLUSIVE_MASK; } /** * 每個執行緒的讀鎖保持計數器 */ static final class HoldCounter { int count; // initially 0 // Use id, not reference, to avoid garbage retention final long tid = LockSupport.getThreadId(Thread.currentThread()); } /** * 持有讀鎖計數器的執行緒區域性物件 */ static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { @Override public HoldCounter initialValue() { return new HoldCounter(); } } /** * 當前執行緒持有的讀鎖計數器物件,在建立時初始化,當讀鎖釋放時移除 */ private transient ThreadLocalHoldCounter readHolds; /** * 最近一個成功獲取讀鎖的執行緒的讀鎖持有計數器 */ private transient HoldCounter cachedHoldCounter; /** * 第一個獲取讀鎖的執行緒 */ private transient Thread firstReader; /** * 第一個獲取讀鎖的執行緒持有讀鎖的計數值 */ private transient int firstReaderHoldCount; Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds } /** * 非公平同步器 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; @Override boolean writerShouldBlock() { return false; // writers can always barge } @Override boolean readerShouldBlock() { // 避免獲取寫鎖的執行緒飢餓 return apparentlyFirstQueuedIsExclusive(); } } AbstractQueuedSynchronizer#apparentlyFirstQueuedIsExclusive /** * 同步佇列中第一個等待獲取鎖的執行緒,需要獲取獨佔的寫鎖,則返回 true */ final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; // 頭結點、第二個節點都不為 null,節點處於獨佔模式,並且節點上有駐留執行緒,表示有執行緒在等待獲取寫鎖。 return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; } ReentrantReadWriteLock#Sync#tryAcquireShared @Override @ReservedStackAccess protected final int tryAcquireShared(int unused) { // 讀取當前執行緒 final Thread current = Thread.currentThread(); // 讀取同步狀態 final int c = getState(); /** * 1)如果寫鎖已經被執行緒持有,並且不是當前執行緒,則獲取失敗,返回 -1。 */ if (Sync.exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) { return -1; } /** * 2)寫鎖未被任何執行緒持有,或寫鎖被當前執行緒持有。 */ // 讀取讀鎖計數值 final int r = Sync.sharedCount(c); /** * 獲取讀鎖的執行緒是否應該被阻塞【同步佇列第一個阻塞執行緒在等待獲取寫鎖】 * && 讀鎖的佔用計數值 < 65535 * && 比較設定讀鎖的新計數值 */ if (!readerShouldBlock() && r < Sync.MAX_COUNT && compareAndSetState(c, c + Sync.SHARED_UNIT)) { // 1)如果是第一個獲取讀鎖的執行緒 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; // 2)當前執行緒是第一個獲取讀鎖的執行緒,並在重複獲取讀鎖 } else if (firstReader == current) { // 累積讀鎖持有計數值 firstReaderHoldCount++; // 3)當前執行緒不是第一個獲取讀鎖的執行緒 } else { // 讀取最近一個成功獲取讀鎖的執行緒的讀鎖持有計數器 HoldCounter rh = cachedHoldCounter; // 如果最近獲取讀鎖的執行緒不是當前執行緒 if (rh == null || rh.tid != LockSupport.getThreadId(current)) { // 獲取當前執行緒的持有計數器 cachedHoldCounter = rh = readHolds.get(); } else if (rh.count == 0) { readHolds.set(rh); } // 遞增計數值 rh.count++; } return 1; } // 寫鎖已經被當前執行緒持有,正在獲取讀鎖 return fullTryAcquireShared(current); } /** * Full version of acquire for reads, that handles CAS misses * and reentrant reads not dealt with in tryAcquireShared. */ final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { // 讀取同步狀態 final int c = getState(); // 1)有執行緒持有寫鎖 if (Sync.exclusiveCount(c) != 0) { // 如果不是當前執行緒持有,則返回 -1 if (getExclusiveOwnerThread() != current) { return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } // 2)寫鎖沒有被任何執行緒持有 } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) { rh = readHolds.get(); // 讀鎖持有計數值為 0,則移除計數器 if (rh.count == 0) { readHolds.remove(); } } } if (rh.count == 0) { return -1; } } } // 讀鎖的獲取計數值已經為最大值 if (Sync.sharedCount(c) == Sync.MAX_COUNT) { throw new Error("Maximum lock count exceeded"); } // 比較更新讀鎖的計數值 if (compareAndSetState(c, c + Sync.SHARED_UNIT)) { // 1)當前執行緒是第一個獲取讀鎖的執行緒 if (Sync.sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; // 2)當前執行緒是第一個獲取讀鎖的執行緒,重複獲取 } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) { rh = cachedHoldCounter; } if (rh == null || rh.tid != LockSupport.getThreadId(current)) { rh = readHolds.get(); } else if (rh.count == 0) { readHolds.set(rh); } rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } AbstractQueuedSynchronizer#doAcquireShared /** * 在共享模式下獲取鎖,執行緒能響應中斷 */ private void doAcquireShared(int arg) { // 建立一個共享節點 final Node node = addWaiter(Node.SHARED); boolean interrupted = false; try { for (;;) { // 讀取前置節點 final Node p = node.predecessor(); // 前置節點是頭節點 if (p == head) { // 嘗試獲取鎖 final int r = tryAcquireShared(arg); /** * 第一個等待獲取寫鎖的執行緒已經成功獲取寫鎖,並且已經使用完畢而釋放, * 當前執行緒成功獲取讀鎖。 */ if (r >= 0) { // 設定當前節點為新的頭節點,並嘗試將訊號往後傳播,喚醒等待獲取讀鎖的執行緒 setHeadAndPropagate(node, r); p.next = null; // help GC return; } } // 當前節點的駐留執行緒需要被阻塞,則阻塞當前執行緒 if (AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(p, node)) { interrupted |= parkAndCheckInterrupt(); } } } catch (final Throwable t) { cancelAcquire(node); throw t; } finally { if (interrupted) { AbstractQueuedSynchronizer.selfInterrupt(); } } } AbstractQueuedSynchronizer#setHeadAndPropagate /** * 設定頭節點,如果同步佇列中有等待獲取讀鎖的執行緒,則嘗試喚醒 */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 讀取當前節點的後置節點,如果其為 null 或處於共享模式【等待獲取讀鎖】 final Node s = node.next; if (s == null || s.isShared()) { // 嘗試釋放後繼節點 doReleaseShared(); } } } AbstractQueuedSynchronizer#doReleaseShared /** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { for (;;) { // 讀取頭節點 final Node h = head; // 頭結點和尾節點不相等,表示有執行緒在等待獲取鎖 if (h != null && h != tail) { // 讀取頭節點的同步狀態 final int ws = h.waitStatus; // 後繼節點需要被喚醒 if (ws == Node.SIGNAL) { // 比較更新同步狀態 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) { // 更新失敗則再次確認 continue; // loop to recheck cases } // 同步狀態成功更新為 0,則喚醒後繼節點的駐留執行緒 unparkSuccessor(h); } // 同步狀態為 0,並且比較更新為 PROPAGATE 失敗,則繼續迴圈 else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) { continue; // loop on failed CAS } } /** * 1)後繼節點被喚醒並且還未成功獲取到鎖,則直接退出迴圈,此時只喚醒了一個執行緒。 * 2)被喚醒的後繼節點成功獲取到讀鎖,駐留執行緒已經被釋放,此時頭節點已經改變,則進行重試。 */ if (h == head) { break; } } }
讀鎖釋放:ReadLock#unlock
/**
* 釋放讀鎖,如果讀鎖的持有計數值為 0,則寫鎖可以被獲取。
*/
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
@ReservedStackAccess
protected final boolean tryReleaseShared(int unused) {
// 讀取當前執行緒
final Thread current = Thread.currentThread();
// 1)當前執行緒是獲取讀鎖的第一個執行緒
if (firstReader == current) {
// 讀鎖持有計數為 1,則釋放後為 0,
if (firstReaderHoldCount == 1) {
firstReader = null;
} else {
// 讀鎖被當前執行緒多次持有,則遞減讀鎖持有計數值
firstReaderHoldCount--;
}
// 2)當前執行緒不是持有讀鎖的執行緒
} else {
// 最近持有讀鎖的執行緒計數值
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current)) {
// 讀取當前執行緒的讀鎖持計數值物件
rh = readHolds.get();
}
final int count = rh.count;
if (count <= 1) {
// 如果為 1,則移除執行緒區域性物件
readHolds.remove();
if (count <= 0) {
throw Sync.unmatchedUnlockException();
}
}
// 遞減計數值
--rh.count;
}
// 原子更新同步狀態值
for (;;) {
final int c = getState();
final int nextc = c - Sync.SHARED_UNIT;
if (compareAndSetState(c, nextc)) {
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
}
寫鎖獲取:WriteLock#lock
ReentrantReadWriteLock#Sync
@Override
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 讀取同步狀態值
final int c = getState();
// 寫鎖被持有的計數值
final int w = Sync.exclusiveCount(c);
// 1)讀鎖或寫鎖至少有一個被執行緒持有
if (c != 0) {
/**
* 2)讀鎖被執行緒持有,或當前執行緒不是寫鎖持有執行緒。
* 目標執行緒不允許先獲取讀鎖,後獲取寫鎖,即 ReentrantReadWriteLock 不支援鎖升級。
*/
if (w == 0 || current != getExclusiveOwnerThread()) {
// 嘗試獲取失敗
return false;
}
// 寫鎖重入次數超出最大值
if (w + Sync.exclusiveCount(acquires) > Sync.MAX_COUNT) {
throw new Error("Maximum lock count exceeded");
}
// 更新同步狀態,寫鎖獲取成功
setState(c + acquires);
return true;
}
// 讀鎖和寫鎖都未被執行緒持有,則原子更新同步狀態
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires)) {
return false;
}
// 設定寫鎖的獨佔執行緒為當前執行緒
setExclusiveOwnerThread(current);
return true;
}
寫鎖釋放:WriteLock#unlock
/**
* 釋放鎖,寫鎖已經被釋放,則返回 true
*/
@Override
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
// 寫鎖是否被當前執行緒持有
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 計算同步狀態值,寫鎖計數值儲存在低 16 位
final int nextc = getState() - releases;
// 新的寫鎖計數值為 0,則表示讀寫鎖已經自由了
final boolean free = Sync.exclusiveCount(nextc) == 0;
if (free) {
// 清空獨佔鎖持有執行緒
setExclusiveOwnerThread(null);
}
setState(nextc);
return free;
}