golang Mutex 實現上的幾個巧妙的點
阿新 • • 發佈:2018-11-25
golang 的metux 的實現有幾個點做法是非常有意思的,一個是底層資料結構上,用了平時很少用的位運算,第二個,用到了自旋,並做了自旋策略控制,最後是用了訊號量控制協程。
首先是golang mutex 中用了很多位運算。位運算不做細介紹,對記憶體利用比較高的演算法都有涉及,比如redies 的壓縮列表,比如golang 的Protobuffer。
有幾個關鍵點,iota 在定義的時候,用的多,做自增運算:
const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexWaiterShift = iota ) // 這裡 第一個變數為1 ,第二個變數為10, 第三個為10
然後,位運算的求或和求與用的很多,一個是與1 求或將某位置1,一個是與0 求與將某位置0,這些都是用於改變某些標誌位的方式,不要看懵逼了:
new := old | mutexLocked // 將old 的最後一位置1,表示new 鎖一定是被持有狀態 if old&mutexLocked != 0 { // 將最後一位保留後,其他位全部置0, 判斷最後一位的狀態是不是0,判斷是不是被持有 if runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ continue } new = old + 1<<mutexWaiterShift }
第二個是golang 的加鎖會自旋,4次自旋沒拿到鎖後再將協程休眠,這樣可以減少切換成本。這裡關判斷條件是自旋次數,cpu核數,p 的數量:
func sync_runtime_canSpin(i int) bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }
最後是利用訊號量掛起和喚醒協程,核心函式是
runtime_SemacquireMutex(&m.sema)
runtime_Semrelease(&m.sema)
獲取訊號時,當s > 0 ,將s--,如果s 為負數,會將當前g 放入阻塞佇列,掛起直到s>0。
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return // cas 獲取到鎖,直接返回
}
awoke := false //迴圈標記
iter := 0 //迴圈計數器
for {
old := m.state //儲存當前鎖狀態
new := old | mutexLocked //將狀態位最後一位指定1
if old&mutexLocked != 0 { //鎖被佔用
if runtime_canSpin(iter) { //檢查是否可以進入自旋鎖,4次
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
//awoke標記為true
awoke = true
}
runtime_doSpin()//進入自旋
iter++
continue
}
new = old + 1<<mutexWaiterShift //鎖被佔用,且自旋次數超過4次,掛起協程數+1,下面步驟將g 掛起並等待
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken //清除標誌
}
if atomic.CompareAndSwapInt32(&m.state, old, new) { //更新協程計數
if old&mutexLocked == 0 {
break
}
// 鎖請求失敗,進入休眠狀態,等待訊號喚醒後重新開始迴圈,一直阻塞在這裡
runtime_SemacquireMutex(&m.sema)
awoke = true
iter = 0
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
解鎖的過程就和加鎖反過來即可:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
new := atomic.AddInt32(&m.state, -mutexLocked)// 移除加鎖位
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
old := new
for {
//當休眠佇列內的等待計數為0或者自旋狀態計數器為0,退出
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
// 等待協程數-1,更改清除標記位
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema)// 釋放鎖,傳送釋放訊號,對應之前的acquire
return
}
old = m.state
}
}