java併發鎖ReentrantReadWriteLock讀寫鎖原始碼分析
阿新 • • 發佈:2019-02-16
鎖的釋放,比較簡單,程式碼@1,主要是將當前執行緒所持有的鎖的數量資訊得到(從firstReader或cachedHoldCounter,或readHolds中獲取 ),然後將數量減少1,如果持有數為1,則直接將該執行緒變數從readHolds ThreadLocal變數中移除,避免垃圾堆積。 程式碼@2,就是在無限迴圈中將共享鎖的數量減少一,在釋放鎖階段,只有當所有的讀鎖,寫鎖被佔有,才會去執行doReleaseShared方法。 2.2 WriterLock 原始碼分析public void unlock() { sync.releaseShared(1); } //AbstractQueuedSynchronizer的 realseShared方法 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // ReentrantReadWriterLock.Sync tryReleaseShared protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // @1 start // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; // @1 end } for (;;) { // @2 int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } } AbstractQueuedSynchronizer的doReleaseShared /** * Release action for shared mode -- signal successor and ensure * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
2.2.1 lock方法詳解
程式碼@1,如果鎖的state不為0,說明有寫鎖,或讀鎖,或兩種鎖持有 程式碼@2,如果寫鎖為0,再加上c!=0,說明此時有讀鎖,自然返回false,表示只能排隊去獲取寫鎖 如果寫鎖不為0,如果持有寫鎖的執行緒不為當前執行緒,自然返回false,排隊去獲取寫鎖。 程式碼@3,表示,當前執行緒持有寫鎖,現在是重入,所以只需要修改鎖的額數量即可。 程式碼@4,表示,表示通過一次CAS去獲取鎖的時候失敗,說明被別的執行緒搶去了,也返回false,排隊去重試獲取鎖。 程式碼@5,成獲取寫鎖後,將當前執行緒設定為佔有寫鎖的執行緒。嘗試獲取鎖方法結束。如果該方法返回false,則進入到acquireQueue方法去排隊獲取寫鎖,寫鎖的獲取過程,與ReentrantLock獲取方法一樣,就不過多的解讀了。public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 對上述程式碼是不是似曾相識,對了,在學習ReentrantLock時候,看到的一樣,acquire是在AbstractQueuedSynchronizer中,關鍵是在 tryAcquire方法,是在不同的子類中實現的。那我們將目光移到ReentrantReadWriterLock.Sync中 protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // @1 // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) //@2 return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); //@3 return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) //@4 return false; setExclusiveOwnerThread(current); //@5 return true; }