Java 併發程式設計(五)讀寫鎖
本文使用的 JDK 版本為 JDK 8
JUC
中關於讀寫鎖的介面定義如下:
// java.util.concurrent.locks.ReadWriteLock
public interface ReadWriteLock {
// 返回一個讀鎖
Lock readLock();
// 返回一個寫鎖
Lock writeLock();
}
在 JUC
中,常用的具體實現為 ReentrantReadWriteLock
,因此,在這裡以 ReentrantReadWriteLock
為例來介紹讀寫鎖的相關內容。
基本使用
讀寫鎖的一個常用的使用場景就是對於資料的讀取操作,在大部分的業務場景下,發生讀的情況要比發生寫的概率要高很多。在這種情況,可以針對熱點資料進行快取,從而提高系統的響應效能。
使用示例如下:
// 該程式碼來源於 JDK 的官方文件,稍作了一點修改 class CachedData { Object data; volatile boolean cacheValid; final DataAccess access; final Order order; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); CachedData(DataAccess access, Order order) { this.access = access; this.order = order; } void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // 當前快取已經失效了,即已經發生了寫事件 // 在獲取寫鎖之前必須釋放讀鎖,否則會造成死鎖 rwl.readLock().unlock(); rwl.writeLock().lock(); try { /* 重新判斷快取是否是失效的, 因為在這個過程中可能已經有其它的執行緒對這個快取的資料進行修改了 */ if (!cacheValid) { data = access.queryData(); cacheValid = true; } /* 獲取讀鎖,在持有寫鎖的情況下,可以獲得讀鎖,這也被稱為 “鎖降級” */ rwl.readLock().lock(); } finally { // 釋放寫鎖,此時依舊持有讀鎖 rwl.writeLock().unlock(); } } try { order.useData(data); } finally { // 注意最後一定要釋放鎖 rwl.readLock().unlock(); } } interface DataAccess { Object queryData(); } interface Order { void useData(Object data); } }
原始碼解析
建構函式
首先,檢視 ReentrantReadWriteLock
例項物件的屬性
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { // ReentrantReadWriteLock 的靜態內部類,為讀鎖 private final ReentrantReadWriteLock.ReadLock readerLock; // ReentrantReadWriteLock 的靜態內部類,為寫鎖 private final ReentrantReadWriteLock.WriteLock writerLock; // 同步工具類,為 AQS 的具體子類 final Sync sync; public ReentrantReadWriteLock() { this(false); } // 建構函式,初始化 ReentrantReadWriteLock,預設情況選擇使用非公平的同步工具 public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } }
再檢視 ReadLock
和 WriteLock
的相關定義,首先檢視 ReadLock
相關的原始碼:
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync; // 注意這裡的 sync
}
// 省略其它一些不是特別重要的程式碼
}
再檢視 WriteLock
相關的原始碼:
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync; // 注意這裡的 sync
}
// 省略其它一些不是特別重要的程式碼
}
可以看到,ReadLock
和 WriteLock
都使用了同一個 Sync
例項物件來維持自身的同步需求,這點很關鍵
原理
ReadLock
中關於獲取鎖和釋放鎖的原始碼:
// 獲取鎖
public void lock() {
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 釋放鎖
public void unlock() {
sync.releaseShared(1);
}
WriteLock
中關於獲取鎖和釋放鎖的原始碼:
// 獲取鎖
public void lock() {
sync.acquire(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// 釋放鎖
public void unlock() {
sync.release(1);
}
通過對比 ReadLock
和 WriteLock
中獲取鎖和釋放鎖的原始碼,很明顯,ReadLock
是以 “共享模式” 的方式獲取和釋放鎖,而 WriteLock
則是通過以獨佔的方式來獲取和釋放鎖。這兩種獲取和釋放鎖的實現都在 AQS
中定義,在此不做過多的詳細介紹
再結合上文關於 ReadLock
和 WriteLock
的建構函式,可以發現它們是使用了同一個 AQS
子類例項物件,也就是說,在 ReentrantReadWriteLock
中的 AQS
的具體子類既使用了“共享模式”,也使用了“獨佔模式”
更一般地來講,回憶一下 AQS
關於 “共享模式” 和 “獨佔模式” 對於 state
變數的使用,“共享模式” 將 state
共享,每個執行緒都能訪問 state
;“獨佔模式” 下,state
被視作是獲取到鎖的狀態,0 表示還沒有執行緒獲取該鎖,大於 0 則表示執行緒獲取鎖的重入次數
為了能夠實現 ReentrantReadWriteLock
中的兩個模式的共用的功能,ReentrantReadWriteLock
中 Sync
類對 state
進行了如下的處理:
ReentrantReadWriteLock
使用了一個 16 位的狀態來表示寫入鎖的計數,並且使用了另外一個 16 位的狀態來表示讀鎖的計數
就是說,state
變數已經被拆分成了兩部分,由於 state
是一個 32 位的整數,現在 state
的前 16 位用於單獨處理“共享模式”,而後 16 位則用於處理 “獨佔模式”
Sync
核心部分就是分析 Sync
的原始碼,在這裡定義了對 state
變數的修改以及獲取鎖和釋放鎖的邏輯
首先檢視 Sync
相關欄位屬性:
abstract static class Sync extends AbstractQueuedSynchronizer {
// 這幾個欄位的作用就是將 state 劃分為兩部分,前 16 位為共享模式,後 16 位為獨佔模式
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// -1 的目的值為了得到最大值
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 取 c 的前 16 位, 只需要右移即可
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 取 c 的後 16 位,只要與對應的掩碼按位與即可
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// 該類的作用是記錄每個執行緒持有的讀鎖的數量
static final class HoldCounter {
// 執行緒持有的讀鎖的數量
int count = 0;
// 執行緒的 ID
final long tid = getThreadId(Thread.currentThread());
}
// ThreadLocal 的子類
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
/*
用於記錄執行緒持有的讀鎖資訊
*/
private transient ThreadLocalHoldCounter readHolds;
/*
用於快取,用於記錄 “最後一個獲取讀鎖” 的執行緒的讀鎖的重入次數
*/
private transient HoldCounter cachedHoldCounter;
/*
第一個獲取讀鎖的執行緒(並且未釋放讀鎖)
*/
private transient Thread firstReader = null;
// 第一個獲取讀鎖的執行緒持有的讀鎖的數量(重入次數)
private transient int firstReaderHoldCount;
Sync() {
// 初始化 readHolds
readHolds = new ThreadLocalHoldCounter();
// 確保 readHolds 的可見性
setState(getState()); // ensures visibility of readHolds
}
}
讀鎖
-
讀鎖的獲取
再回到
ReadLock
部分,獲取鎖的原始碼如下:public void lock() { sync.acquireShared(1); } // 與之對應的 AQS 的程式碼 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
重點在於
tryAcquireShared(arg)
方法,該方法在Sync
中定義:protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); /* exclusiveCount(c) != 0 說明當前有執行緒持有寫鎖,在這種情況下就不能直接獲取讀鎖 但是如果持有寫鎖的執行緒時當前執行緒,那麼就可以繼續獲取讀鎖 */ if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); // 讀鎖的獲取次數 if (!readerShouldBlock() && // 讀鎖是否需要阻塞 r < MAX_COUNT && // 判斷獲取鎖的次數是否溢位(2^16 - 1) compareAndSetState(c, c + SHARED_UNIT)) { // 將讀鎖的獲取次數 +1 // 此時已經獲取到了讀鎖 // r == 0 說明執行緒是第一個獲取讀鎖的,或者前面獲取讀鎖的執行緒都已經釋放了讀鎖 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 是否重入 firstReaderHoldCount++; } else { // 更新快取,即最後一個獲取讀鎖的執行緒 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) // 快取當前的執行緒持有的讀鎖的數量 readHolds.set(rh); rh.count++; } // 返回一個大於 0 的數,表示已經獲取到了讀鎖 return 1; } return fullTryAcquireShared(current); }
如果要走到最後一個
return
語句,可能有以下幾種情況:readerShouldBlock()
返回true
,這可能有兩種情況:在FairSync
中的hasQueuedPredecessors()
方法返回true
,即阻塞佇列中存在其它元素在等待鎖;在NoFairSync
中的apparentlyFirstQueuedIsExclusive()
方法返回true
,即判斷阻塞佇列中head
的後繼節點是否是用來獲取寫鎖的,如果是的話,那麼讓這個鎖先來,避免寫鎖飢餓- 持有寫鎖的數量超過最大值(2^16 - 1)
CAS
失敗,即存在競爭關係,可能是多個執行緒爭奪一個讀鎖,或者多個執行緒爭奪一個寫鎖
如果是發生了以上幾種情況,那麼就需要呼叫
fullTryAcquireShared
再次嘗試
fullTryAcquireShared(current)
方法對應的原始碼:/* 引入這個方法的目的是為了減少鎖競爭 */ final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { // 永真迴圈避免由於 CAS 失敗直接退出的情況 int c = getState(); // 如果其它執行緒持有了寫鎖,自然是獲取不到鎖了,因此需要進入到阻塞佇列 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // 處理重入 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { /* cachedHoldCounter 快取的不是當前的執行緒,那麼到 ThreadLocal 中獲取當前執行緒的 HolderCounter, 如果執行緒從來沒有初始化過 ThreadLocal 的值,那麼 get() 方法將會執行初始化 */ rh = readHolds.get(); /* rh.count == 0 說明上一行程式碼只是單純地初始化,那麼它依舊是需要去排隊的 */ if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { /* 如果在這裡已經 CAS 成功了,那麼久意味著成功獲取讀鎖了, 下面要做的就是設定 firstReader 或 cachedHoldCounter */ if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { // 設定 cachedHoldCounter 為當前的執行緒 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; // 大於 0 表示獲取到了鎖 } } }
-
讀鎖的釋放
ReadLock
釋放鎖的程式碼如下:public void unlock() { sync.releaseShared(1); }
這個方法位於
AQS
中,具體的定義如下:// 就是一般的釋放共享變數的邏輯,具體的模版方法為 tryReleaseShared,需要子類去具體實現 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); // 這裡是 AQS 的相關知識,再次不做過多的介紹 return true; } return false; }
Sync
中對於tryReleaseShared
的具體實現如下:protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { if (firstReaderHoldCount == 1) /* 如果 firstReaderHoldCount == 1,那麼將 firstReader 置為 null 這是為了給後續的執行緒使用 */ firstReader = null; else firstReaderHoldCount--; } else { // 判斷 cachedHoldCounter 是否快取的是當前的執行緒,如果不是的話,那麼久需要從 ThreadLocal 中獲取 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { // 這一步將 ThreadLocal 移除掉,這是為了避免記憶體洩露,因此當前執行緒已經不再持有讀鎖了 readHolds.remove(); if (count <= 0) // unlock() 次數太多了 throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); // nextc 是 state 高 16 位 -1 後的值 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) /* 如果 nextc == 0,那就是 state 全部 32 位都為 0,即讀鎖和寫鎖都已經全部被釋放了 此時在這裡返回 true 的話,其實是幫助喚醒後繼節點中獲取鎖的執行緒 */ return nextc == 0; } }
寫鎖
-
寫鎖的獲取
寫鎖是獨佔鎖,因此如果已經有執行緒獲取到了讀鎖,那麼寫鎖需要進入到阻塞佇列中等待
寫鎖加鎖的原始碼:
public void lock() { sync.acquire(1); // 標準的 AQS 寫法 }
重點在於
Sync
類對於tryAcquire
的實現,具體的原始碼如下:protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { /* c != 0 && w == 0: 寫鎖可用,但是有執行緒持有讀鎖(也可能是自己持有) c != 0 && w != 0 && current != getExclusiveOwnerThread(): 其它執行緒持有寫鎖 也就是說,只要有讀鎖或者寫鎖被佔用,這次就不能獲取到寫鎖 */ if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 這裡不需要 CAS,因為能夠走到這的只可能是寫鎖重入 setState(c + acquires); return true; } // 如果寫鎖獲取不需要阻塞,那麼則執行 CAS,成功則代表獲取到了寫鎖 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
-
寫鎖的釋放
寫鎖釋放的原始碼如下:
public void unlock() { sync.release(1); // 標準的 AQS 的使用 }
與之密切相關的就是
Sync
對於tryRelease
方法的具體實現,具體的實現程式碼如下所示:// 簡單來講就是將 state 中關於寫鎖的持有的數量 -1 protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
鎖降級
持有寫鎖的執行緒,去獲取讀鎖的這個過程被稱為鎖降級,這樣的話,一個執行緒可能既持有寫鎖,也持有讀鎖。但是,是不存在鎖升級這個情況的,因為如果一個持有讀鎖的執行緒,再去嘗試獲取寫鎖,這種情況下就有可能會發生死鎖
參考:
[1] https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.html