1. 程式人生 > 實用技巧 >go中的讀寫鎖使用小結

go中的讀寫鎖使用小結

讀寫鎖

基本結構

寫鎖需要阻塞寫鎖:一個協程擁有寫鎖時,其他協程寫鎖定需要阻塞
寫鎖需要阻塞讀鎖:一個協程擁有寫鎖時,其他協程讀鎖定需要阻塞
讀鎖需要阻塞寫鎖:一個協程擁有讀鎖時,其他協程寫鎖定需要阻塞
讀鎖不能阻塞讀鎖:一個協程擁有讀鎖時,其他協程也可以擁有讀鎖

RWMutex提供4個簡單的介面來提供服務:

RLock():讀鎖定
RUnlock():解除讀鎖定
Lock(): 寫鎖定,與Mutex完全一致
Unlock():解除寫鎖定,與Mutex完全一致

看下具體的實現

type RWMutex struct {
	w           Mutex  // 用於控制多個寫鎖,獲得寫鎖首先要獲取該鎖,如果有一個寫鎖在進行,那麼再到來的寫鎖將會阻塞於此
	writerSem   uint32 // 寫阻塞等待的訊號量,最後一個讀者釋放鎖時會釋放訊號量
	readerSem   uint32 // 讀阻塞的協程等待的訊號量,持有寫鎖的協程釋放鎖後會釋放訊號量
	readerCount int32  // 記錄讀者個數
	readerWait  int32  // 記錄寫阻塞時讀者個數
}

RLock

// 讀加鎖
// 增加讀操作計數,即readerCount++
// 阻塞等待寫操作結束(如果有的話)
func (rw *RWMutex) RLock() {
    // 競態檢測
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
    // 首先通過atomic的原子性使readerCount+1
    // 1、如果readerCount<0。說明寫鎖已經獲取了,那麼這個讀鎖需要等待寫鎖的完成
    // 2、如果readerCount>=0。當前讀直接獲取鎖
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// 當前有個寫鎖, 讀操作阻塞等待寫鎖釋放
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
    // 是否開啟檢測race
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
	}
}

上面的通過判斷readerCount,來判斷是否有寫鎖,為什麼呢?

寫操作是如何阻止讀操作的

這個是讀寫鎖實現中最精華的技巧。

我們知道RWMutex.readerCount是個整型值,用於表示讀者數量,不考慮寫操作的情況下,每次讀鎖定將該值+1,每次解除讀鎖定將該值-1,
所以readerCount取值為[0, N],N為讀者個數,實際上最大可支援2^30個併發讀者。

當寫鎖定進行時,會先將readerCount減去2^30,從而readerCount變成了負值,此時再有讀鎖定到來時檢測到readerCount為負值,便知道有寫操作
在進行,只好阻塞等待。而真實的讀操作個數並不會丟失,只需要將readerCount

加上2^30即可獲得。

寫操作通過readerCount的操作來阻止讀操作的

RUnlock

// 減少讀操作計數,即readerCount--
// 喚醒等待寫操作的協程(如果有的話)
func (rw *RWMutex) RUnlock() {
    // 是否開啟檢測race
	if race.Enabled {
		_ = rw.w.state
		race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
		race.Disable()
	}
    // 首先通過atomic的原子性使readerCount-1
    // 1.若readerCount大於0, 證明當前還有讀鎖, 直接結束本次操作
    // 2.若readerCount小於等於0, 證明已經沒有讀鎖, 可以喚醒寫鎖(若有)
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// 將goroutine排到G佇列的後面,掛起goroutine
		rw.rUnlockSlow(r)
	}
    // 是否開啟檢測race
	if race.Enabled {
		race.Enable()
	}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}
	// readerWait--操作,如果有寫鎖,推出在寫鎖之前產生的讀鎖
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

上面的讀操作解鎖,其中當沒有讀鎖的時候,是回去喚醒寫鎖的,那麼讀鎖是如何阻塞寫鎖的呢?

讀鎖定會先將RWMutext.readerCount加1,此時寫操作到來時發現讀者數量不為0,會阻塞等待所有讀操作結束。

也就是說,讀操作通過readerCount來將來阻止寫操作的。

Lock

// 獲取互斥鎖
// 阻塞等待所有讀操作結束(如果有的話)
func (rw *RWMutex) Lock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	// 獲取互斥鎖
	rw.w.Lock()
    // readerCount-rwmutexMaxReaders小於0, 再加回rwmutexMaxReaders
	// 若r仍然不為0, 代表當前還有讀鎖
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// 等待寫鎖前面的讀鎖釋放, 所以若不為0就阻塞寫鎖, 等待rUnlockSlow-rUnlockSlow的readerWait-1直至0倍喚醒寫鎖
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        // 阻塞寫鎖
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
		race.Acquire(unsafe.Pointer(&rw.writerSem))
	}
}

RWMutex.readerWait的作用

我們知道,寫操作要等待讀操作結束後才可以獲得鎖,寫操作等待期間可能還有新的讀操作持續到來,如果寫操作等待所有讀操作結束,很可能被餓死。然而,
通過RWMutex.readerWait可完美解決這個問題。

寫操作到來時,會把RWMutex.readerCount值拷貝到RWMutex.readerWait中,用於標記排在寫操作前面的讀者個數。

前面的讀操作結束後,除了會遞減RWMutex.readerCount,還會遞減RWMutex.readerWait值,當RWMutex.readerWait值變為0時喚醒寫操作。

寫操作之後產生的讀操作就會加入到readerCount,阻塞知道寫鎖釋放。

Unlock

// 喚醒因讀鎖定而被阻塞的協程(如果有的話)
// 解除互斥鎖
func (rw *RWMutex) Unlock() {
	if race.Enabled {
		_ = rw.w.state
		race.Release(unsafe.Pointer(&rw.readerSem))
		race.Disable()
	}

	// 增加readerCount, 若超過讀鎖的最大限制, 觸發panic
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	//解除阻塞的讀鎖(若有)
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// 釋放互斥鎖
	rw.w.Unlock()
	if race.Enabled {
		race.Enable()
	}
}

問題要論

寫操作是如何阻止寫操作的

讀寫鎖包含一個互斥鎖(Mutex),寫鎖定必須要先獲取該互斥鎖,如果互斥鎖已被協程A獲取(或者協程A在阻塞等待讀結束),意味著協程A獲取了互斥鎖,那麼協程B只能阻塞等待該互斥鎖。

所以,寫操作依賴互斥鎖阻止其他的寫操作。

寫操作是如何阻止讀操作的

我們知道RWMutex.readerCount是個整型值,用於表示讀者數量,不考慮寫操作的情況下,每次讀鎖定將該值+1,每次解除讀鎖定將該值-1,所以readerCount取值為[0, N],N為讀者個數,實際上最大可支援2^30個併發讀者。

當寫鎖定進行時,會先將readerCount減去2^30,從而readerCount變成了負值,此時再有讀鎖定到來時檢測到readerCount為負值,便知道有寫操作在進行,只好阻塞等待。而真實的讀操作個數並不會丟失,只需要將readerCount加上2^30即可獲得。

所以,寫操作將readerCount變成負值來阻止讀操作的。

讀操作是如何阻止寫操作的

寫操作到來時,會把RWMutex.readerCount值拷貝到RWMutex.readerWait中,用於標記排在寫操作前面的讀者個數。

前面的讀操作結束後,除了會遞減RWMutex.readerCount,還會遞減RWMutex.readerWait值,當RWMutex.readerWait值變為0時喚醒寫操作。

為什麼寫鎖定不會被餓死

我們知道,寫操作要等待讀操作結束後才可以獲得鎖,寫操作等待期間可能還有新的讀操作持續到來,如果寫操作等待所有讀操作結束,很可能被餓死。然而,通過RWMutex.readerWait可完美解決這個問題。

寫操作到來時,會把RWMutex.readerCount值拷貝到RWMutex.readerWait中,用於標記排在寫操作前面的讀者個數。

前面的讀操作結束後,除了會遞減RWMutex.readerCount,還會遞減RWMutex.readerWait值,當RWMutex.readerWait值變為0時喚醒寫操作。

參考

【Package race】https://golang.org/pkg/internal/race/
【sync.RWMutex原始碼分析】http://liangjf.top/2020/07/20/141.sync.RWMutex原始碼分析/
【剖析Go的讀寫鎖】http://zablog.me/2017/09/27/go_sync/
【《Go專家程式設計》GO 讀寫鎖實現原理剖析】https://my.oschina.net/renhc/blog/2878292