1. 程式人生 > 實用技巧 >學習筆記 sync/RWMutex原始碼

學習筆記 sync/RWMutex原始碼

RWMutex是一個讀寫鎖,該鎖可以加多個讀鎖或者一個寫鎖,其經常用於讀次數遠遠多於寫次數的場景.
func (rw *RWMutex) Lock() 寫鎖,如果在新增寫鎖之前已經有其他的讀鎖和寫鎖,則lock就會阻塞直到該鎖可用,為確保該鎖最終可用,已阻塞的 Lock 呼叫會從獲得的鎖中排除新的讀取器,即寫鎖許可權高於讀鎖,有寫鎖時優先進行寫鎖定
func (rw *RWMutex) Unlock() 寫鎖解鎖,如果沒有進行寫鎖定,則就會引起一個執行時錯誤.
func (rw *RWMutex) RLock() 讀鎖,當有寫鎖時,無法載入讀鎖,當只有讀鎖或者沒有鎖時,可以載入讀鎖,讀鎖可以載入多個,所以適用於"讀多寫少"的場景
func (rw *RWMutex) RUnlock() 讀鎖解鎖,RUnlock 撤銷單次 RLock 呼叫,它對於其它同時存在的讀取器則沒有效果。若 rw 並沒有為讀取而鎖定,呼叫 RUnlock 就會引發一個執行時錯誤

 
package sync
 
import (
    "internal/race"
    "sync/atomic"
    "unsafe"
)
 
// There is a modified copy of this file in runtime/rwmutex.go.
// If you make any changes here, see if you should make them there.
 
// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex. // // A RWMutex must not be copied after first use. // // If a goroutine holds a RWMutex for reading and another goroutine might // call Lock, no goroutine should expect to be able to acquire a read lock // until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes // available; a blocked Lock call excludes new readers from acquiring the // lock. type RWMutex struct { w Mutex // held if there are pending writers // 互斥鎖 writerSem uint32 // semaphore for writers to wait for completing readers 寫鎖訊號量 readerSem uint32 // semaphore for readers to wait for completing writers 讀鎖訊號量 readerCount int32 // number of pending readers 讀鎖計數器 readerWait int32 // number of departing readers 獲取寫鎖時需要等待的讀鎖釋放數量 } const rwmutexMaxReaders = 1 << 30 // 支援最多2^30個讀鎖 // RLock locks rw for reading. // // It should not be used for recursive read locking; a blocked Lock // call excludes new readers from acquiring the lock. See the // documentation on the RWMutex type. // 它不應該用於遞迴讀鎖定; func (rw *RWMutex) RLock() { if race.Enabled { _ = rw.w.state race.Disable() } // 每次goroutine獲取讀鎖時,readerCount+1 // 如果寫鎖已經被獲取,那麼readerCount在 - rwmutexMaxReaders與 0 之間,這時掛起獲取讀鎖的goroutine, // 如果寫鎖沒有被獲取,那麼readerCount>=0,獲取讀鎖,不阻塞 // 通過readerCount的正負判斷讀鎖與寫鎖互斥,如果有寫鎖存在就掛起讀鎖的goroutine,多個讀鎖可以並行 if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. // 將goroutine排到G佇列的後面,掛起goroutine, 監聽readerSem訊號量 runtime_SemacquireMutex(&rw.readerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) } } // RUnlock undoes a single RLock call; // it does not affect other simultaneous readers. // It is a run-time error if rw is not locked for reading // on entry to RUnlock. // 讀鎖不會影響其他讀操作 // 如果在進入RUnlock時沒有鎖沒有被施加讀鎖的話,則會出現執行時錯誤。 func (rw *RWMutex) RUnlock() { if race.Enabled { _ = rw.w.state race.ReleaseMerge(unsafe.Pointer(&rw.writerSem)) race.Disable() } // 讀鎖計數器 -1 // 有四種情況,其中後面三種都會進這個 if // 【一】有讀鎖,但沒有寫鎖被掛起 // 【二】有讀鎖,且也有寫鎖被掛起 // 【三】沒有讀鎖且沒有寫鎖被掛起的時候, r+1 == 0 // 【四】沒有讀鎖但是有寫鎖被掛起,則 r+1 == -(1 << 30) if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } if race.Enabled { race.Enable() } } func (rw *RWMutex) rUnlockSlow(r int32) { // 讀鎖早就被沒有了,那麼在此 -1 是需要拋異常的 // 這裡只有當讀鎖沒有的時候才會出現的兩種極端情況 // 【一】沒有讀鎖且沒有寫鎖被掛起的時候, r+1 == 0 // 【二】沒有讀鎖但是有寫鎖被掛起,則 r+1 == -(1 << 30) if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() throw("sync: RUnlock of unlocked RWMutex") } // A writer is pending. // 如果獲取寫鎖時的goroutine被阻塞,這時需要獲取讀鎖的goroutine全部都釋放,才會被喚醒 // 更新需要釋放的 寫鎖的等待讀鎖釋放數目 // 最後一個讀鎖解除時,寫鎖的阻塞才會被解除. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. // 更新訊號量,通知被掛起的寫鎖去獲取鎖 runtime_Semrelease(&rw.writerSem, false, 1) } } // Lock locks rw for writing. // If the lock is already locked for reading or writing, // Lock blocks until the lock is available. // 對一個已經lock的rw上鎖會被阻塞 // 如果鎖已經鎖定以進行讀取或寫入,則鎖定將被阻塞,直到鎖定可用。 func (rw *RWMutex) Lock() { if race.Enabled { _ = rw.w.state race.Disable() } // First, resolve competition with other writers. // 首先,獲取互斥鎖,與其他來獲取寫鎖的goroutine 互斥 rw.w.Lock() // Announce to readers there is a pending writer. // 告訴其他來獲取讀鎖操作的goroutine,現在有人獲取了寫鎖 // 減去最大的讀鎖數量,用0 -負數 來表示寫鎖已經被獲取 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. // 設定需要等待釋放的讀鎖數量,如果有,則掛起獲取 競爭寫鎖 goroutine if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { // 掛起,監控寫鎖訊號量 runtime_SemacquireMutex(&rw.writerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) race.Acquire(unsafe.Pointer(&rw.writerSem)) } } // Unlock unlocks rw for writing. It is a run-time error if rw is // not locked for writing on entry to Unlock. // // As with Mutexes, a locked RWMutex is not associated with a particular // goroutine. One goroutine may RLock (Lock) a RWMutex and then // arrange for another goroutine to RUnlock (Unlock) it. // 如果在寫鎖時,rw沒有被解鎖,則會出現執行時錯誤。 // 與互斥鎖一樣,鎖定的RWMutex與特定的goroutine無關。 // 一個goroutine可以RLock(鎖定)RWMutex然後安排另一個goroutine到RUnlock(解鎖)它。 func (rw *RWMutex) Unlock() { if race.Enabled { _ = rw.w.state race.Release(unsafe.Pointer(&rw.readerSem)) race.Disable() } // Announce to readers there is no active writer. // 向讀鎖的goroutine發出通知,現在已經沒有寫鎖了 // 還原加鎖時減去的那一部分readerCount r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { // 讀鎖數目超過了 最大允許數 race.Enable() throw("sync: Unlock of unlocked RWMutex") } // Unblock blocked readers, if any. // 喚醒獲取讀鎖期間所有被阻塞的goroutine for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() // 釋放互斥鎖資源 if race.Enabled { race.Enable() } } // RLocker returns a Locker interface that implements // the Lock and Unlock methods by calling rw.RLock and rw.RUnlock. // RLocker返回一個Locker介面的實現 // 通過呼叫rw.RLock和rw.RUnlock來鎖定和解鎖方法。 func (rw *RWMutex) RLocker() Locker { return (*rlocker)(rw) } type rlocker RWMutex func (r *rlocker) Lock() { (*RWMutex)(r).RLock() } func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

從上面的程式碼中我們可以看到,讀寫鎖首先是內建了一個互斥鎖,然後再加上維護各種計數器來實現的讀寫鎖,緊接著提供了四個函式支撐著讀寫鎖操作,由 Lock 和Unlock 分別支援寫鎖的鎖定和釋放,由RLock 和RUnlock 來支援讀鎖的的鎖定和釋放。其中,讀鎖不涉及 內建mutex的使用,寫鎖用了mutex來排斥其他寫鎖。

讀寫互斥鎖的實現比較有技巧性一些,需要幾點

1. 讀鎖不能阻塞讀鎖,引入readerCount實現

2. 讀鎖需要阻塞寫鎖,直到所以讀鎖都釋放,引入readerSem實現

3. 寫鎖需要阻塞讀鎖,直到所以寫鎖都釋放,引入wirterSem實現

4. 寫鎖需要阻塞寫鎖,引入Metux實現

【讀鎖的】Rlock:

【讀鎖的】RUnlock:

【寫鎖的】Lock:

【寫鎖的】Unlock:

參考https://blog.csdn.net/qq_25870633/article/details/83448234