1. 程式人生 > >go中sync.Mutex原始碼解讀

go中sync.Mutex原始碼解讀

- [互斥鎖](#%E4%BA%92%E6%96%A5%E9%94%81) - [前言](#%E5%89%8D%E8%A8%80) - [什麼是sync.Mutex](#%E4%BB%80%E4%B9%88%E6%98%AFsyncmutex) - [分析下原始碼](#%E5%88%86%E6%9E%90%E4%B8%8B%E6%BA%90%E7%A0%81) - [Lock](#lock) - [位運算](#%E4%BD%8D%E8%BF%90%E7%AE%97) - [Unlock](#unlock) - [總結](#%E6%80%BB%E7%BB%93) - [參考](#%E5%8F%82%E8%80%83) ## 互斥鎖 ### 前言 本次的程式碼是基於`go version go1.13.15 darwin/amd64` ### 什麼是sync.Mutex `sync.Mutex`是Go標準庫中常用的一個排外鎖。當一個`goroutine`獲得了這個鎖的擁有權後, 其它請求鎖的`goroutine`就會阻塞在`Lock`方法的呼叫上,直到鎖被釋放。 ```go var ( mu sync.Mutex balance int ) func main() { Deposit(1) fmt.Println(Balance()) } func Deposit(amount int) { mu.Lock() balance = balance + amount mu.Unlock() } func Balance() int { mu.Lock() b := balance mu.Unlock() return b } ``` 使用起來很簡單,對需要鎖定的資源,前面加`Lock()`鎖定,完成的時候加`Unlock()`解鎖就好了。 ### 分析下原始碼 ```go const ( // mutex is locked // 是否加鎖的標識 mutexLocked = 1 << iota mutexWoken mutexStarving mutexWaiterShift = iota // 公平鎖 // // 鎖有兩種模式:正常模式和飢餓模式。 // 在正常模式下,所有的等待鎖的goroutine都會存在一個先進先出的佇列中(輪流被喚醒) // 但是一個被喚醒的goroutine並不是直接獲得鎖,而是仍然需要和那些新請求鎖的(new arrivial) // 的goroutine競爭,而這其實是不公平的,因為新請求鎖的goroutine有一個優勢——它們正在CPU上 // 執行,並且數量可能會很多。所以一個被喚醒的goroutine拿到鎖的概率是很小的。在這種情況下, // 這個被喚醒的goroutine會加入到佇列的頭部。如果一個等待的goroutine有超過1ms(寫死在程式碼中) // 都沒獲取到鎖,那麼就會把鎖轉變為飢餓模式。 // // 在飢餓模式中,鎖的所有權會直接從釋放鎖(unlock)的goroutine轉交給佇列頭的goroutine, // 新請求鎖的goroutine就算鎖是空閒狀態也不會去獲取鎖,並且也不會嘗試自旋。它們只是排到佇列的尾部。 // // 如果一個goroutine獲取到了鎖之後,它會判斷以下兩種情況: // 1. 它是佇列中最後一個goroutine; // 2. 它拿到鎖所花的時間小於1ms; // 以上只要有一個成立,它就會把鎖轉變回正常模式。 // 正常模式會有比較好的效能,因為即使有很多阻塞的等待鎖的goroutine, // 一個goroutine也可以嘗試請求多次鎖。 // 飢餓模式對於防止尾部延遲來說非常的重要。 starvationThresholdNs = 1e6 ) // A Mutex is a mutual exclusion lock. // The zero value for a Mutex is an unlocked mutex. // // A Mutex must not be copied after first use. type Mutex struct { // mutex鎖當前的狀態 state int32 // 訊號量,用於喚醒goroutine sema uint32 } ``` 重點開看下`state`的幾種狀態: 大神寫程式碼的思路就是驚奇,這裡`state`又運用到了位移的操作 - mutexLocked 對應右邊低位第一個bit 1 代表鎖被佔用 0代表鎖空閒 - mutexWoken 對應右邊低位第二個bit 1 表示已喚醒 0表示未喚醒 - mutexStarving 對應右邊低位第三個bit 1 代表鎖處於飢餓模式 0代表鎖處於正常模式 - mutexWaiterShift 值為3,根據 `mutex.state >
> mutexWaiterShift` 得到當前阻塞的`goroutine`數目,最多可以阻塞`2^29`個`goroutine`。 - starvationThresholdNs 值為1e6納秒,也就是1毫秒,當等待佇列中隊首g`oroutine`等待時間超過`starvationThresholdNs`也就是1毫秒,mutex進入飢餓模式。 #### Lock 加鎖基本上就這三種情況: 1、可直接獲取鎖,直接加鎖,返回; 2、有衝突 首先自旋,如果其他`goroutine`在這段時間內釋放了該鎖,直接獲得該鎖;如果沒有就走到下面3; 3、有衝突,且已經過了自旋階段,通過訊號量進行阻塞; - 1、剛被喚醒的 加入到等待佇列首部; - 2、新加入的 加入到等待佇列的尾部。 4、有衝突,根據不同的模式做處理; - 1、飢餓模式 獲取鎖 - 2、正常模式 喚醒,繼續迴圈,回到2 ```go // Lock locks m. // 如果鎖正在使用中,新的goroutine請求,將被阻塞,直到鎖被釋放 func (m *Mutex) Lock() { // 原子的(cas)來判斷是否加鎖 // 如果可以獲取鎖,直接加鎖,返回 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // 這把鎖,已經被別的goroutine持有 m.lockSlow() } func (m *Mutex) lockSlow() { var waitStartTime int64 // 是否處於飢餓模式 starving := false // 用來存當前goroutine是否已喚醒 awoke := false // 用來存當前goroutine的迴圈次數 iter := 0 // 記錄下當前的狀態 old := m.state for { // 第一個條件是state已被鎖,但是不是飢餓狀態。如果時飢餓狀態,自旋時沒有用的,鎖的擁有權直接交給了等待佇列的第一個。 // 第二個條件是還可以自旋,多核、壓力不大並且在一定次數內可以自旋, 具體的條件可以參考`sync_runtime_canSpin`的實現。 // 如果滿足這兩個條件,不斷自旋來等待鎖被釋放、或者進入飢餓狀態、或者不能再自旋。 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 自旋的過程中如果發現state還沒有設定woken標識,則設定它的woken標識, 並標記自己為被喚醒。 if !awoke && old&mutexWoken == 0 && old>
>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } // 主動自旋 runtime_doSpin() // 迴圈次數加一 iter++ old = m.state continue } // 到了這一步, state的狀態可能是: // 1. 鎖還沒有被釋放,鎖處於正常狀態 // 2. 鎖還沒有被釋放, 鎖處於飢餓狀態 // 3. 鎖已經被釋放, 鎖處於正常狀態 // 4. 鎖已經被釋放, 鎖處於飢餓狀態 // new 複製 state的當前狀態, 用來設定新的狀態 // old 是鎖當前的狀態 new := old // 如果old state狀態不是飢餓狀態, new state 設定鎖, 嘗試通過CAS獲取鎖, // 如果old state狀態是飢餓狀態, 則不設定new state的鎖,因為飢餓狀態下鎖直接轉給等待佇列的第一個. if old&mutexStarving == 0 { // 虛擬碼:newState = locked new |= mutexLocked } // 如果鎖是被獲取狀態,或者飢餓狀態 // 就把期望狀態中的等待佇列的等待者數量+1(實際上是new + 8) if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 如果當前goroutine已經處於飢餓狀態, 並且old state的已被加鎖, // 將new state的狀態標記為飢餓狀態, 將鎖轉變為飢餓狀態. if starving && old&mutexLocked != 0 { // 設定為飢餓狀態 new |= mutexStarving } if awoke { // goroutine已從睡眠中喚醒, // 因此,無論哪種情況,我們都需reset if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } // 設定new設定為非喚醒狀態 // &^的意思是and not new &^= mutexWoken } // 原子(cas)更新state的狀態 // 注意new的鎖標記不一定是true, 也可能只是標記一下鎖的state是飢餓狀態. if atomic.CompareAndSwapInt32(&m.state, old, new) { // 如果說old狀態不是飢餓狀態也不是被獲取狀態 // 那麼代表當前goroutine已經通過CAS成功獲取了鎖 if old&(mutexLocked|mutexStarving) == 0 { // 直接break break // locked the mutex with CAS } // 如果我們之前已經在等了,那就排在隊伍前面。 queueLifo := waitStartTime != 0 // 如果說之前沒有等待過,就初始化設定現在的等待時間 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // queueLifo為true,也就是之前已經在等了 // runtime_SemacquireMutex中的lifo為true,則將等待服務程式放在等待佇列的開頭。 // 會被阻塞 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 阻塞被喚醒 // 如果當前goroutine已經是飢餓狀態了 // 或者當前goroutine已經等待了1ms(在上面定義常量)以上 // 就把當前goroutine的狀態設定為飢餓 starving = starving || runtime_nanotime()-waitStartTime >
starvationThresholdNs old = m.state // 如果是飢餓模式 if old&mutexStarving != 0 { // 如果goroutine被喚醒,互斥鎖處於飢餓模式 // 鎖的所有權轉移給當前goroutine,但是鎖處於不一致的狀態中:mutexLocked沒有設定 // 並且我們將仍然被認為是waiter。這個狀態需要被修復。 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 當前goroutine獲取鎖,waiter數量-1 delta := int32(mutexLocked - 1<>mutexWaiterShift == 1 { // 退出飢餓模式 // 在這裡這麼做至關重要,還要考慮等待時間。 // 飢餓模式是非常低效率的,一旦兩個goroutine將互斥鎖切換為飢餓模式,它們便可以無限鎖。 delta -= mutexStarving } // 原子的加上更新的值 atomic.AddInt32(&m.state, delta) break } // 不是飢餓模式,就把當前的goroutine設為被喚醒 awoke = true // 重置迴圈的次數 iter = 0 } else { // 如果CAS不成功,也就是說沒能成功獲得鎖,鎖被別的goroutine獲得了或者鎖一直沒被釋放 // 那麼就更新狀態,重新開始迴圈嘗試拿鎖 old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } } const ( active_spin = 4 ) // src/runtime/proc.go // Active spinning for sync.Mutex. // go:linkname sync_runtime_canSpin sync.runtime_canSpin // go:nosplit func sync_runtime_canSpin(i int) bool { // sync.Mutex是會被多個goroutine競爭的,所以自旋的次數需要控制 // active_spin的值為4 // 滿足下面的新增才會發生自旋 // 1、自旋的次數小於active_spin也就是4 // 2、如果在單核的cpu是不能自旋的 // 3、 GOMAXPROCS> 1,並且至少有一個其他正在執行的P,並且本地runq為空。 // 4、當前P沒有其它等待執行的G if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true } // src/runtime/proc.go // go:linkname sync_runtime_doSpin sync.runtime_doSpin // go:nosplit // procyield的實現是用匯編實現的 func sync_runtime_doSpin() { procyield(active_spin_cnt) } // src/runtime/asm_amd64.s TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: // 讓加鎖失敗時cpu睡眠30個(about)clock,從而使得讀操作的頻率低很多。流水線重排的代價也會小很多 PAUSE SUBL $1, AX JNZ again RET ``` 梳理下流程 1、原子的(cas)來判斷是否加鎖,如果之前鎖沒有被使用,當前`goroutine`獲取鎖,結束本次`Lock`操作; 2、如果已經被別的`goroutine`持有了,啟動一個for迴圈去搶佔鎖; 會存在兩種狀態的切換 飢餓狀態和正常狀態 如果一個等待的goroutine有超過1ms(寫死在程式碼中)都沒獲取到鎖,那麼就會把鎖轉變為飢餓模式 如果一個goroutine獲取到了鎖之後,它會判斷以下兩種情況: - 1、它是佇列中最後一個goroutine; - 2、它拿到鎖所花的時間小於1ms; 以上只要有一個成立,它就會把鎖轉變回正常模式。 3、如果鎖已經被鎖了,並且不是飢餓狀態,並且滿足自旋的條件,當前goroutine會不斷的進行自旋,等待鎖被釋放; 4、不滿足鎖自旋的條件,然後結束自旋,這是當前鎖的狀態可能有下面幾種情況: - 1、鎖還沒有被釋放,鎖處於正常狀態 - 2、鎖還沒有被釋放, 鎖處於飢餓狀態 - 3、鎖已經被釋放, 鎖處於正常狀態 - 4、鎖已經被釋放, 鎖處於飢餓狀態 5、如果`old.state`不是飢餓狀態,新的`goroutine`嘗試去獲鎖,如果是飢餓狀態,就直接將鎖直接轉給等待佇列的第一個; 6、如果鎖是被獲取或飢餓狀態,等待者的數量加一; 7、當本`goroutine`被喚醒了,要麼獲得了鎖,要麼進入休眠; 8、如果`old state`的狀態是未被鎖狀態,並且鎖不處於飢餓狀態,那麼當前`goroutine`已經獲取了鎖的擁有權,結束`Lock`; 9、判斷一下當前`goroutine`是新來的還是剛被喚醒的,新來的加入到等待佇列的尾部,剛被喚醒的加入到等待佇列的頭部,然後通過訊號量阻塞,直到當前`goroutine`被喚醒; 10、判斷如果當前`state`是否是飢餓狀態,不是的喚醒本次`goroutine`,繼續迴圈,是飢餓狀態繼續往下面走; 11、飢餓狀態,當前`goroutine`來設定鎖,等待者減一,如果當前`goroutine`是佇列中最後一個`goroutine`設定飢餓狀態為正常,拿到鎖結束`Lock`。 ##### 位運算 上面有很多關於&和|的運算和判斷,下面來具體的分析下 ``` & 位運算 AND | 位運算 OR ^ 位運算 XOR &^ 位清空(AND NOT) << 左移 >> 右移 ``` **&** 參與運算的兩數各對應的二進位相與,兩個二進位制位都為1時,結果才為1 ``` 0101 AND 0011 = 0001 ``` **|** 參與運算的兩數各對應的二進位相或,兩個二進位制位都為1時,結果才為0 ```go 0101(十進位制5) OR 0011(十進位制3) = 0111(十進位制7) ``` **^** 按位異或運算,對等長二進位制模式或二進位制數的每一位執行邏輯異或操作。操作的結果是如果某位不同則該位為1,否則該位為0。 ``` 0101 XOR 0011 = 0110 ``` **&^** 將運算子左邊資料相異的位保留,相同位清零 ``` 0001 0100 &^ 0000 1111 = 0001 0000 ``` **<<** 各二進位全部左移若干位,高位丟棄,低位補0 ``` 0001(十進位制1) << 3(左移3位) = 1000(十進位制8) ``` **>>** 各二進位全部右移若干位,對無符號數,高位補0,有符號數,各編譯器處理方法不一樣,有的補符號位(算術右移),有的補0 ``` 1010(十進位制10) >> 2(右移2位) = 0010(十進位制2) ``` #### Unlock ```go // Unlock unlocks m. // 如果沒有lock就去unlocak是會報錯的 // //一個鎖定的互斥鎖與一個特定的goroutine沒有關聯。 // 它允許一個goroutine鎖定一個互斥鎖然後 // 安排另一個goroutine解鎖它。 func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // 修改state的狀態 new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // 不為0,說明沒有成功解鎖 m.unlockSlow(new) } } func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { // 如果說鎖沒有等待拿鎖的goroutine // 或者鎖被獲取了(在迴圈的過程中被其它goroutine獲取了) // 或者鎖是被喚醒狀態(表示有goroutine被喚醒,不需要再去嘗試喚醒其它goroutine) // 或者鎖是飢餓模式(會直接轉交給佇列頭的goroutine) // 那麼就直接返回,啥都不用做了 // 也就是沒有等待的goroutine, 或者鎖不處於空閒的狀態,直接返回. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 走到這一步的時候,說明鎖目前還是空閒狀態,並且沒有goroutine被喚醒且佇列中有goroutine等待拿鎖 // 將等待的goroutine數減一,並設定woken標識 new = (old - 1< ### 總結 1、加鎖的過程會存在正常模式和互斥模式的轉換; 2、飢餓模式就是保證鎖的公平性,正常模式下的互斥鎖能夠提供更好地效能,飢餓模式的能避免 Goroutine 由於陷入等待無法獲取鎖而造成的高尾延時; 3、鎖的狀態的轉換,也使用到了位運算; 4、一個已經鎖定的互斥鎖,允許其他協程進行解鎖,不過只能被解鎖一次; ### 參考 【sync.Mutex 原始碼分析】https://reading.hidevops.io/articles/sync/sync_mutex_source_code_analysis/ 【一份詳細註釋的go Mutex原始碼】http://cbsheng.github.io/posts/%E4%B8%80%E4%BB%BD%E8%AF%A6%E7%BB%86%E6%B3%A8%E9%87%8A%E7%9A%84go-mutex%E6%BA%90%E7%A0%81/ 【原始碼剖析 golang 中 sync.Mutex】https://www.purewhite.io/2019/03/28/golang-mutex-source/ 【sync.mutex 原始碼分析】https://colobu.com/2018/12/18/dive-into-sync-mutex/ 【原始碼剖析 golang 中 sync.Mutex】https://www.purewhite.io/2019/03/28/golang-mutex-source/ >**本文作者**:liz >**本文連結**:https://boilingfrog.github.io/2021/03/14/sync.Mutex/ >**版權宣告**:本文為博主原創文章,遵循 [CC 4.0 BY-SA](https://creativecommons.org/licenses/by-sa/4.0/) 版權協議,轉載請附上原文出處連結和本聲