go中的讀寫鎖使用小結
讀寫鎖
基本結構
寫鎖需要阻塞寫鎖:一個協程擁有寫鎖時,其他協程寫鎖定需要阻塞
寫鎖需要阻塞讀鎖:一個協程擁有寫鎖時,其他協程讀鎖定需要阻塞
讀鎖需要阻塞寫鎖:一個協程擁有讀鎖時,其他協程寫鎖定需要阻塞
讀鎖不能阻塞讀鎖:一個協程擁有讀鎖時,其他協程也可以擁有讀鎖
RWMutex提供4個簡單的介面來提供服務:
RLock():讀鎖定 RUnlock():解除讀鎖定 Lock(): 寫鎖定,與Mutex完全一致 Unlock():解除寫鎖定,與Mutex完全一致
看下具體的實現
type RWMutex struct {
w Mutex // 用於控制多個寫鎖,獲得寫鎖首先要獲取該鎖,如果有一個寫鎖在進行,那麼再到來的寫鎖將會阻塞於此
writerSem uint32 // 寫阻塞等待的訊號量,最後一個讀者釋放鎖時會釋放訊號量
readerSem uint32 // 讀阻塞的協程等待的訊號量,持有寫鎖的協程釋放鎖後會釋放訊號量
readerCount int32 // 記錄讀者個數
readerWait int32 // 記錄寫阻塞時讀者個數
}
RLock
// 讀加鎖 // 增加讀操作計數,即readerCount++ // 阻塞等待寫操作結束(如果有的話) func (rw *RWMutex) RLock() { // 競態檢測 if race.Enabled { _ = rw.w.state race.Disable() } // 首先通過atomic的原子性使readerCount+1 // 1、如果readerCount<0。說明寫鎖已經獲取了,那麼這個讀鎖需要等待寫鎖的完成 // 2、如果readerCount>=0。當前讀直接獲取鎖 if atomic.AddInt32(&rw.readerCount, 1) < 0 { // 當前有個寫鎖, 讀操作阻塞等待寫鎖釋放 runtime_SemacquireMutex(&rw.readerSem, false, 0) } // 是否開啟檢測race if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) } }
上面的通過判斷readerCount,來判斷是否有寫鎖,為什麼呢?
寫操作是如何阻止讀操作的
這個是讀寫鎖實現中最精華的技巧。
我們知道RWMutex.readerCount
是個整型值,用於表示讀者數量,不考慮寫操作的情況下,每次讀鎖定將該值+1,每次解除讀鎖定將該值-1,
所以readerCount
取值為[0, N],N為讀者個數,實際上最大可支援2^30個併發讀者。
當寫鎖定進行時,會先將readerCount
減去2^30,從而readerCount
變成了負值,此時再有讀鎖定到來時檢測到readerCount為負值,便知道有寫操作
在進行,只好阻塞等待。而真實的讀操作個數並不會丟失,只需要將readerCount
寫操作通過readerCount的操作來阻止讀操作的
RUnlock
// 減少讀操作計數,即readerCount--
// 喚醒等待寫操作的協程(如果有的話)
func (rw *RWMutex) RUnlock() {
// 是否開啟檢測race
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
// 首先通過atomic的原子性使readerCount-1
// 1.若readerCount大於0, 證明當前還有讀鎖, 直接結束本次操作
// 2.若readerCount小於等於0, 證明已經沒有讀鎖, 可以喚醒寫鎖(若有)
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// 將goroutine排到G佇列的後面,掛起goroutine
rw.rUnlockSlow(r)
}
// 是否開啟檢測race
if race.Enabled {
race.Enable()
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// readerWait--操作,如果有寫鎖,推出在寫鎖之前產生的讀鎖
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
上面的讀操作解鎖,其中當沒有讀鎖的時候,是回去喚醒寫鎖的,那麼讀鎖是如何阻塞寫鎖的呢?
讀鎖定會先將RWMutext.readerCount
加1,此時寫操作到來時發現讀者數量不為0,會阻塞等待所有讀操作結束。
也就是說,讀操作通過readerCount
來將來阻止寫操作的。
Lock
// 獲取互斥鎖
// 阻塞等待所有讀操作結束(如果有的話)
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 獲取互斥鎖
rw.w.Lock()
// readerCount-rwmutexMaxReaders小於0, 再加回rwmutexMaxReaders
// 若r仍然不為0, 代表當前還有讀鎖
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 等待寫鎖前面的讀鎖釋放, 所以若不為0就阻塞寫鎖, 等待rUnlockSlow-rUnlockSlow的readerWait-1直至0倍喚醒寫鎖
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// 阻塞寫鎖
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
RWMutex.readerWait
的作用
我們知道,寫操作要等待讀操作結束後才可以獲得鎖,寫操作等待期間可能還有新的讀操作持續到來,如果寫操作等待所有讀操作結束,很可能被餓死。然而,
通過RWMutex.readerWait
可完美解決這個問題。
寫操作到來時,會把RWMutex.readerCount
值拷貝到RWMutex.readerWait
中,用於標記排在寫操作前面的讀者個數。
前面的讀操作結束後,除了會遞減RWMutex.readerCount
,還會遞減RWMutex.readerWait
值,當RWMutex.readerWait
值變為0時喚醒寫操作。
寫操作之後產生的讀操作就會加入到readerCount
,阻塞知道寫鎖釋放。
Unlock
// 喚醒因讀鎖定而被阻塞的協程(如果有的話)
// 解除互斥鎖
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// 增加readerCount, 若超過讀鎖的最大限制, 觸發panic
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
//解除阻塞的讀鎖(若有)
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 釋放互斥鎖
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
問題要論
寫操作是如何阻止寫操作的
讀寫鎖包含一個互斥鎖(Mutex),寫鎖定必須要先獲取該互斥鎖,如果互斥鎖已被協程A獲取(或者協程A在阻塞等待讀結束),意味著協程A獲取了互斥鎖,那麼協程B只能阻塞等待該互斥鎖。
所以,寫操作依賴互斥鎖阻止其他的寫操作。
寫操作是如何阻止讀操作的
我們知道RWMutex.readerCount
是個整型值,用於表示讀者數量,不考慮寫操作的情況下,每次讀鎖定將該值+1,每次解除讀鎖定將該值-1,所以readerCount取值為[0, N],N為讀者個數,實際上最大可支援2^30個併發讀者。
當寫鎖定進行時,會先將readerCount減去2^30,從而readerCount變成了負值,此時再有讀鎖定到來時檢測到readerCount為負值,便知道有寫操作在進行,只好阻塞等待。而真實的讀操作個數並不會丟失,只需要將readerCount加上2^30即可獲得。
所以,寫操作將readerCount變成負值來阻止讀操作的。
讀操作是如何阻止寫操作的
寫操作到來時,會把RWMutex.readerCount
值拷貝到RWMutex.readerWait
中,用於標記排在寫操作前面的讀者個數。
前面的讀操作結束後,除了會遞減RWMutex.readerCount
,還會遞減RWMutex.readerWait
值,當RWMutex.readerWait
值變為0時喚醒寫操作。
為什麼寫鎖定不會被餓死
我們知道,寫操作要等待讀操作結束後才可以獲得鎖,寫操作等待期間可能還有新的讀操作持續到來,如果寫操作等待所有讀操作結束,很可能被餓死。然而,通過RWMutex.readerWait
可完美解決這個問題。
寫操作到來時,會把RWMutex.readerCount
值拷貝到RWMutex.readerWait
中,用於標記排在寫操作前面的讀者個數。
前面的讀操作結束後,除了會遞減RWMutex.readerCount
,還會遞減RWMutex.readerWait
值,當RWMutex.readerWait
值變為0時喚醒寫操作。
參考
【Package race】https://golang.org/pkg/internal/race/
【sync.RWMutex原始碼分析】http://liangjf.top/2020/07/20/141.sync.RWMutex原始碼分析/
【剖析Go的讀寫鎖】http://zablog.me/2017/09/27/go_sync/
【《Go專家程式設計》GO 讀寫鎖實現原理剖析】https://my.oschina.net/renhc/blog/2878292