AQS系列(三)- ReentrantReadWriteLock讀寫鎖的加鎖
前言
前兩篇我們講述了ReentrantLock的加鎖釋放鎖過程,相對而言比較簡單,本篇進入深水區,看看ReentrantReadWriteLock-讀寫鎖的加鎖過程是如何實現的,繼續拜讀老Lea凌厲的程式碼風。
一、讀寫鎖的類圖
讀鎖就是共享鎖,而寫鎖是獨佔鎖。讀鎖與寫鎖之間的互斥關係為:讀讀可同時執行(有條件的);讀寫與寫寫均互斥執行。注意此處讀讀可並行我用了有條件的並行,後文會對此做介紹。
繼續奉上一張醜陋的類圖:
可以看到ReentrantReadWriteLock維護了五個內部類,ReentrantReadWriteLock中存放了Sync、ReadLock、WriteLock三個成員變數,如下截圖所示:
而ReadLock和WriteLock中又存放了Sync變數,截圖如下所示,這樣一組合,有了四種鎖,公平讀鎖、公平寫鎖、非公平讀鎖、非公平寫鎖。對於公平與非公平的實現區別,我們上一篇已經做過講解,本文將著重關注讀鎖和寫鎖的實現區別。
二、加鎖原始碼
在前文中我們知道,ReentrantLock中用state來判斷當前鎖是否被佔用,而讀寫鎖ReentrantReadWriteLock中由於同時存在兩種鎖,所以老Lea用state的高16位來存放讀鎖的佔用狀態以及重入次數,低16位存放寫鎖的佔用狀態和重入次數。
1、讀鎖加鎖,即共享鎖加鎖
1 public void lock() { 2 sync.acquireShared(1); // 獲取共享鎖方法 3 }
上述lock方法中呼叫的獲取共享鎖方法是在AbstractQueuedSynchronizer中實現的,程式碼如下:
1 public final void acquireShared(int arg) { 2 if (tryAcquireShared(arg) < 0) 3 doAcquireShared(arg); 4 }
可以看到獲取共享鎖分成了兩步,第一步是嘗試獲取,如果獲取不到再進入if裡面執行doAcquireShared方法,下面分別追蹤。
1)、tryAcquireShared方法
1 protected final int tryAcquireShared(int unused) { 2 Thread current = Thread.currentThread(); 3 int c = getState(); 4 // 1.有寫鎖佔用並且不是當前執行緒,則直接返回獲取失敗 5 if (exclusiveCount(c) != 0 && 6 getExclusiveOwnerThread() != current) 7 return -1; 8 // 執行到這裡,有兩種情況 沒有寫鎖佔用或者是當前執行緒 9 int r = sharedCount(c); // 獲取讀鎖次數 10 // 2、不應該阻塞則獲取鎖 @此方法有點意思,需著重講解,作用:判斷讀鎖是否需要阻塞 11 if (!readerShouldBlock() && 12 r < MAX_COUNT && 13 compareAndSetState(c, c + SHARED_UNIT)) { 14 // 如果CAS成功,則將當前執行緒對應的計數+1 15 if (r == 0) { // 如果讀鎖持有數為0,則說明當前執行緒是第一個reader,分別給firstReader和firstReaderHoldCount初始化 16 firstReader = current; 17 firstReaderHoldCount = 1; 18 } else if (firstReader == current) { // 如果讀鎖持有數不為0且當前執行緒就是firstReader,那麼直接給firstReaderHoldCount+1,表示讀鎖重入 19 firstReaderHoldCount++; 20 } else { // 其他情況,即當前執行緒不是firstReader且還有其他執行緒持有讀鎖,則要獲取到當前執行緒對應的HoldCounter,然後給裡面的計數+1 21 HoldCounter rh = cachedHoldCounter; 22 if (rh == null || rh.tid != getThreadId(current)) 23 cachedHoldCounter = rh = readHolds.get(); 24 else if (rh.count == 0) 25 readHolds.set(rh); 26 rh.count++; 27 } 28 return 1; 29 } 30 // 3、應該阻塞或者CAS失敗則進入此方法獲取鎖 31 return fullTryAcquireShared(current); 32 }
結合上述程式碼中的註釋,將邏輯分三部分,我們一步步分析此方法的邏輯。
首先第一步,判斷如果有寫鎖並且當前執行緒不是寫鎖的執行緒,則直接退出獲取讀鎖的嘗試,因為讀寫是互斥的,退出此方法後就會進入doAcquireShared方法,後續邏輯見下面的2)。但此處還是要看一下寫鎖狀態統計方法exclusiveCount和讀鎖狀態統計方法sharedCount,方法原始碼如下截圖所示:
可以看到,exclusiveCount方法是將c和獨佔掩碼進行與操作,獨佔掩碼EXCLUSIVE_MASK高16位均為0,低16位均為1,按位與計算之後就剩下c的低16位,這就是第二部分一開始說的低16位存放寫鎖重入次數;同理看sharedCount方法,將c有符號右移16位,這樣移位之後低16位就是原來的高16位,即讀鎖的加鎖次數。老Lea通過這兩個方法實現了用一個int型別的state存放寫鎖讀鎖兩個加鎖次數的結果,是不是看起來就很高階!
然後看第二步,判斷讀不應該阻塞(即readerShouldBlock方法返回false)且讀鎖持有次數小於最大值且CAS成功,則進入方法中嘗試獲取讀鎖。先看看重點方法readerShouldBlock什麼時候會返回false(不阻塞)什麼時候返回true(阻塞)。此方法在非公平模式和公平模式中有不同的實現,公平模式程式碼:
1 final boolean readerShouldBlock() { 2 return hasQueuedPredecessors(); 3 }
看到了一個熟悉的身影,hashQueuedPredecessors方法,這不就是在ReentrantLock中公平鎖加鎖時的方法麼?詳細可看我的AQS系列(一)中的講解,總結一下就是該方法判斷佇列前面是否有在排隊的非當前執行緒,意思就是按排隊順序獲取鎖,不要爭搶。
非公平模式程式碼:
1 final boolean readerShouldBlock() { 2 return apparentlyFirstQueuedIsExclusive(); 3 }
1 final boolean apparentlyFirstQueuedIsExclusive() { 2 Node h, s; 3 return (h = head) != null && 4 (s = h.next) != null && 5 !s.isShared() && 6 s.thread != null; 7 }
在後面的方法中,返回了一個四個條件組成的布林值,邏輯為頭節點不為空並且頭節點後的第一個節點不為空並且這個節點是獨佔的並且執行緒不為空,此時返回true即當前這個讀操作應該阻塞,不讓它獲取到鎖。那麼問題來了,為什麼要有這個邏輯?此處是為了避免一種異常情況的發生,如果後面有一個排隊的寫鎖在等待獲取鎖,而這時有一個讀鎖正在執行中,若在讀鎖執行完之前又來了一個讀鎖,因為讀鎖與讀鎖不阻塞所以後來的的讀鎖又獲取到了鎖,這時在佇列第一個位置排隊的寫鎖仍然在傻傻的等著,沒辦法,誰讓你沒關係。就這樣,如果一直有讀鎖在當前正在執行的讀鎖執行完之前進來獲取讀鎖,那麼後面的寫鎖就會一直傻等在那,永遠都沒法獲取鎖。所以Lea就設計了這個方法來避免這種情況的發生,即如果判斷佇列第一位排隊的是寫鎖,那麼後面的讀鎖就先等一等,等這個寫鎖執行完了你們再執行。這也就是我在文章的開始講的-讀讀同時執行是有條件的,這個條件就是指這裡。
看第二步之前要先說說讀鎖的處理邏輯,因為是可重入的讀鎖,所以需要記錄每個獲取讀鎖執行緒的重入次數,即每個讀的執行緒都有一個與其對應的重入次數。然後繼續看第二步中讀鎖獲取鎖成功(即CAS成功)之後的邏輯:如果讀鎖持有數為0,則說明當前執行緒是第一個reader,分別給firstReader和firstReaderHoldCount初始化;如果讀鎖持有數不為0且當前執行緒就是firstReader,那麼直接給firstReaderHoldCount+1,表示讀鎖重入;否則,即當前執行緒不是firstReader且還有其他執行緒持有讀鎖,則要獲取到當前執行緒對應的HoldCounter,然後給裡面的計數+1。
下面再一起看看【否則】中的邏輯,貼上一下Sync中的部分程式碼
1 abstract static class Sync extends AbstractQueuedSynchronizer { 2 // ... 3 static final class HoldCounter { 4 int count = 0; 5 // Use id, not reference, to avoid garbage retention 6 final long tid = getThreadId(Thread.currentThread()); 7 } 8 9 static final class ThreadLocalHoldCounter 10 extends ThreadLocal<HoldCounter> { 11 public HoldCounter initialValue() { 12 return new HoldCounter(); 13 } 14 } 15 16 private transient ThreadLocalHoldCounter readHolds; 17 18 private transient HoldCounter cachedHoldCounter; 19 20 private transient Thread firstReader = null; 21 private transient int firstReaderHoldCount; 22 23 Sync() { 24 readHolds = new ThreadLocalHoldCounter(); 25 setState(getState()); // ensures visibility of readHolds 26 } 27 // ... 28 }
可以看到,Sync中快取了一個HoldCounter,存放的是最近一次讀鎖記錄。而如果當前執行緒不是最近一次記錄的HoldCounter,則去readHolds中取,readHolds是ThreadLocalHoldCounter型別,在Sync的無參構造器中初始化,它與HoldCounter都是Sync的內部類,ThreadLocalHoldCounter就是一個ThreadLocal,內部維護了一個執行緒與HoldCounter的鍵值對map,一個執行緒對應一個HoldCounter。所以【否則】中的邏輯加註釋如下所示:
1 HoldCounter rh = cachedHoldCounter; // 獲取最近一次記錄的HoldCounter,此快取是為了提高效率,不用每次都去ThreadLocal中取 2 if (rh == null || rh.tid != getThreadId(current)) // 判斷當前執行緒是不是最近一次記錄的HoldCounter 3 cachedHoldCounter = rh = readHolds.get(); // 如果不是,則去Sync中的ThreadLocal中獲取,然後再放在快取中 4 else if (rh.count == 0) // 如果count計數為0,說明是第一次重入,則將HoldCounter加入ThreadLocal中 5 readHolds.set(rh); 6 rh.count++; // 當前執行緒重入次數+1
下面進入第三步,fullTryAcquireShared方法,進入此方法的前提條件是沒有寫鎖且 (讀應該阻塞或者讀鎖CAS失敗)。看這個full方法的邏輯:
1 final int fullTryAcquireShared(Thread current) { 2 3 HoldCounter rh = null; 4 for (;;) { // 無限迴圈直到有確定的結果返回 5 int c = getState(); 6 if (exclusiveCount(c) != 0) { // 1、有獨佔鎖且不是當前執行緒,直接返回讀鎖加鎖失敗 7 if (getExclusiveOwnerThread() != current) 8 return -1; 9 // else we hold the exclusive lock; blocking here 10 // would cause deadlock. 11 } else if (readerShouldBlock()) { // 2、判斷讀是否應該阻塞 12 // Make sure we're not acquiring read lock reentrantly 13 if (firstReader == current) { // 判斷如果當前執行緒就是firstReader,那麼什麼都不做,進入3中嘗試獲取鎖,why? 因為這說明當前執行緒之前就持有了鎖還沒釋放,所以可以繼續獲取 14 // assert firstReaderHoldCount > 0; 15 } else { // 2.5 此處邏輯需要仔細研讀,乍看時看的一頭霧水 16 if (rh == null) { // 第一次進來時rh肯定==null 17 rh = cachedHoldCounter; 18 if (rh == null || rh.tid != getThreadId(current)) { 19 rh = readHolds.get(); 20 if (rh.count == 0) // 如果當前執行緒沒獲取到過讀鎖,則從本地執行緒變數中移除HoldCounter,因為下一步就要判定它獲取鎖失敗先不讓它獲取了 21 readHolds.remove(); 22 } 23 }// 能走到這裡,說明當前讀鎖應該阻塞且不是firstReader 24 if (rh.count == 0) // 再加上當前執行緒沒獲取到過讀鎖,則先不讓它嘗試獲取鎖了,直接返回獲取失敗 25 return -1; 26 } 27 } 28 if (sharedCount(c) == MAX_COUNT) 29 throw new Error("Maximum lock count exceeded"); 30 // 3、再次嘗試獲取鎖 31 if (compareAndSetState(c, c + SHARED_UNIT)) { 32 if (sharedCount(c) == 0) { 33 firstReader = current; 34 firstReaderHoldCount = 1; 35 } else if (firstReader == current) { 36 firstReaderHoldCount++; 37 } else { 38 if (rh == null) 39 rh = cachedHoldCounter; 40 if (rh == null || rh.tid != getThreadId(current)) 41 rh = readHolds.get(); 42 else if (rh.count == 0) 43 readHolds.set(rh); 44 rh.count++; 45 cachedHoldCounter = rh; // cache for release 46 } 47 return 1; 48 } 49 } 50 }
詳細看看註解以及原始碼註釋、程式碼邏輯,相信能理解這個過程。
2)、doAcquireShared方法
1 private void doAcquireShared(int arg) { 2 // 將當前讀鎖加到佇列後面 3 final Node node = addWaiter(Node.SHARED); 4 boolean failed = true; 5 try { 6 boolean interrupted = false; 7 for (;;) { 8 // 得到前一個節點 9 final Node p = node.predecessor(); 10 if (p == head) { // 如果前一個節點是頭節點,則嘗試獲取鎖 11 int r = tryAcquireShared(arg); 12 if (r >= 0) { // 設定頭節點並且啟用後續的節點 13 setHeadAndPropagate(node, r); 14 p.next = null; // help GC 15 if (interrupted) 16 selfInterrupt(); 17 failed = false; 18 return; 19 } 20 }// 判斷應該掛起則掛起執行緒 21 if (shouldParkAfterFailedAcquire(p, node) && 22 parkAndCheckInterrupt()) 23 interrupted = true; 24 } 25 } finally { 26 if (failed) 27 cancelAcquire(node); 28 } 29 }
該方法跟之前系列中ReentrantLock的加鎖過程類似,在此就不做過多的解釋了,總之還是通過park來掛起。
2、寫鎖加鎖,即獨佔鎖加鎖
進入lock方法:
1 public void lock() { 2 sync.acquire(1); 3 }
熟悉的樣子,繼續 點進去:
1 public final void acquire(int arg) { 2 if (!tryAcquire(arg) && 3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4 selfInterrupt(); 5 }
還是原先的方法,但是各個方法的實現有區別了。先看第一個tryAcquire:
1 protected final boolean tryAcquire(int acquires) { 2 Thread current = Thread.currentThread(); 3 int c = getState(); 4 int w = exclusiveCount(c); 5 if (c != 0) { // 如果排它鎖存在,則判斷是不是當前執行緒,如果也不是當前執行緒,則直接返回獲取失敗 6 // (Note: if c != 0 and w == 0 then shared count != 0) 7 if (w == 0 || current != getExclusiveOwnerThread()) 8 return false; 9 if (w + exclusiveCount(acquires) > MAX_COUNT) 10 throw new Error("Maximum lock count exceeded"); 11 // Reentrant acquire 12 setState(c + acquires); 13 return true; 14 } // 判斷讀鎖要不要阻塞,此處針對公平鎖和非公平鎖有不同的實現,對於非公平鎖統一返回false表示不要阻塞,而公平鎖則會檢視前面還有沒有鎖來判斷要不要阻塞 15 if (writerShouldBlock() || 16 !compareAndSetState(c, c + acquires)) 17 return false; 18 setExclusiveOwnerThread(current); 19 return true; 20 }
然後是addWaiter在佇列末尾新增node節點排隊,這個方法在AbstractQueuedSynchronizer中,同樣是熟悉的方法了,此處略過不提。
最後是acquireQueued方法,如下所示,又是熟悉的程式碼,跟ReentrantLock中的加鎖方法一毛一樣,唯一的不同點是第7行呼叫的tryAcquire方法的實現,此處調的是ReentrantReadWriteLock類中Sync的方法,也就是上面的第一個方法。
1 final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true; 3 try { 4 boolean interrupted = false; 5 for (;;) { 6 final Node p = node.predecessor(); 7 if (p == head && tryAcquire(arg)) { 8 setHead(node); 9 p.next = null; // help GC 10 failed = false; 11 return interrupted; 12 } 13 if (shouldParkAfterFailedAcquire(p, node) && 14 parkAndCheckInterrupt()) 15 interrupted = true; 16 } 17 } finally { 18 if (failed) 19 cancelAcquire(node); 20 } 21 }
寫鎖的加鎖過程基本就這些了,相對來說比讀鎖加鎖容易了很多,因為大多都跟ReentrantLock中的實現相仿。
後記
讀寫鎖的加鎖過程到此為止,最近每晚下班回來讀一會,斷斷續續的四晚上才搞定,難受 >&