1. 程式人生 > >多圖詳解Go的互斥鎖Mutex

多圖詳解Go的互斥鎖Mutex

> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com > > 本文使用的go的原始碼時14.4 ## Mutex介紹 Mutex 結構體包含兩個欄位: * 欄位state:表示當前互斥鎖的狀態。 * 欄位 sema:是個訊號量變數,用來控制等待 goroutine 的阻塞休眠和喚醒。 ```go type Mutex struct { state int32 sema uint32 } ``` 在Go的1.9版本中,為了解決等待中的 goroutine 可能會一直獲取不到鎖,增加了飢餓模式,讓鎖變得更公平,不公平的等待時間限制在 1 毫秒。 state狀態欄位所表示的含義較為複雜,如下圖所示,最低三位分別表示mutexLocked、mutexWoken、mutexStarving,state總共是32位長度,所以剩下的位置,用來表示可以有1<<(32-3)個Goroutine 等待互斥鎖的釋放: ![Group 1](https://img.luozhiyun.com/20201218225206.png) 程式碼表示如下: ```go const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving ) ``` ## 加鎖流程 ### fast path ```go func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } m.lockSlow() } ``` 加鎖的時候,一開始會通過CAS看一下能不能直接獲取鎖,如果可以的話,那麼直接獲取鎖成功。 ### lockSlow ```go // 等待時間 var waitStartTime int64 // 飢餓標記 starving := false // 喚醒標記 awoke := false // 自旋次數 iter := 0 // 當前的鎖的狀態 old := m.state for { // 鎖是非飢餓狀態,鎖還沒被釋放,嘗試自旋 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>
>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } // 自旋 runtime_doSpin() // 自旋次數加1 iter++ // 設定當前鎖的狀態 old = m.state continue } ... } ``` 進入到lockSlow方法之後首先會判斷以下能否可以自旋,判斷依據就是通過計算: ``` old&(mutexLocked|mutexStarving) == mutexLocked ``` 可以知道當前鎖的狀態必須是上鎖,並且不能處於飢餓狀態,這個判斷才為true,然後再看看iter是否滿足次數的限制,如果都為true,那麼則往下繼續。 內層if包含了四個判斷: * 首先判斷了awoke是不是喚醒狀態; * `old&mutexWoken == 0`為真表示沒有其他正在喚醒的節點; * `old>
>mutexWaiterShift != 0`表明當前有正在等待的goroutine; * CAS將state的mutexWoken狀態位設定為`old|mutexWoken`,即為1是否成功。 如果都滿足,那麼將awoke狀態設定為真,然後將自旋次數加一,並重新設定狀態。 繼續往下看: ```go new := old if old&mutexStarving == 0 { // 如果當前不是飢餓模式,那麼將mutexLocked狀態位設定1,表示加鎖 new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { // 如果當前被鎖定或者處於飢餓模式,則waiter加一,表示等待一個等待計數 new += 1 << mutexWaiterShift } // 如果是飢餓狀態,並且已經上鎖了,那麼mutexStarving狀態位設定為1,設定為飢餓狀態 if starving && old&mutexLocked != 0 { new |= mutexStarving } // awoke為true則表明當前執行緒在上面自旋的時候,修改mutexWoken狀態成功 if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } // 清除喚醒標誌位 new &^= mutexWoken } ``` 走到這裡有兩種情況:1. 自旋超過了次數;2. 目前鎖沒有被持有。 所以第一個判斷,如果當前加了鎖,但是沒有處於飢餓狀態,也會重複設定`new |= mutexLocked`,即將mutexLocked狀態設定為1; 如果是old已經是飢餓狀態或者已經被上鎖了,那麼需要設定Waiter加一,表示這個goroutine下面不會獲取鎖,會等待; 如果starving為真,表示當前goroutine是飢餓狀態,並且old已經被上鎖了,那麼設定`new |= mutexStarving`,即將mutexStarving狀態位設定為1; awoke如果在自旋時設定成功,那麼在這裡要`new &^= mutexWoken`消除mutexWoken標誌位。因為後續流程很有可能當前執行緒會被掛起,就需要等待其他釋放鎖的goroutine來喚醒,如果unlock的時候發現mutexWoken的位置不是0,則就不會去喚醒,則該執行緒就無法再醒來加鎖。 繼續往下: ```go if atomic.CompareAndSwapInt32(&m.state, old, new) { // 1.如果原來狀態沒有上鎖,也沒有飢餓,那麼直接返回,表示獲取到鎖 if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // 2.到這裡是沒有獲取到鎖,判斷一下等待時長是否不為0 // 如果不為0,那麼加入到佇列頭部 queueLifo := waitStartTime != 0 // 3.如果等待時間為0,那麼初始化等待時間 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 4.阻塞等待 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 5.喚醒之後檢查鎖是否應該處於飢餓狀態 starving = starving || runtime_nanotime()-waitStartTime >
starvationThresholdNs old = m.state // 6.判斷是否已經處於飢餓狀態 if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 7.加鎖並且將waiter數減1 delta := int32(mutexLocked - 1<>mutexWaiterShift == 1 { // 8.如果當前goroutine不是飢餓狀態,就從飢餓模式切換會正常模式 delta -= mutexStarving } // 9.設定狀態 atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } ``` 到這裡,首先會CAS設定新的狀態,如果設定成功則往下走,否則返回之後迴圈設定狀態。設定成功之後: 1. 首先會判斷old狀態,如果沒有飢餓,也沒有獲取到鎖,那麼直接返回,因為這種情況在進入到這段程式碼之前會將new狀態設定為mutexLocked,表示已經獲取到鎖。這裡還判斷了一下old狀態不能為飢餓狀態,否則也不能獲取到鎖; 2. 判斷waitStartTime是否已經初始化過了,如果是新的goroutine來搶佔鎖,那麼queueLifo會返回false;如果不是新的goroutine來搶佔鎖,那麼加入到等待佇列頭部,這樣等待最久的 goroutine 優先能夠獲取到鎖; 3. 如果等待時間為0,那麼初始化等待時間; 4. 阻塞等待,當前goroutine進行休眠; 5. 喚醒之後檢查鎖是否應該處於飢餓狀態,並設定starving變數值; 6. 判斷是否已經處於飢餓狀態,如果不處於飢餓狀態,那麼這裡直接進入到下一個for迴圈中獲取鎖; 7. 加鎖並且將waiter數減1,這裡我看了一會,沒用懂什麼意思,其實需要分兩步來理解,相當於state+mutexLocked,然後state再將waiter部分的數減一; 8. 如果當前goroutine不是飢餓狀態或者waiter只有一個,就從飢餓模式切換會正常模式; 9. 設定狀態; 下面用圖例來解釋: 這部分的圖解是休眠前的操作,休眠前會根據old的狀態來判斷能不能直接獲取到鎖,如果old狀態沒有上鎖,也沒有飢餓,那麼直接break返回,因為這種情況會在CAS中設定加上鎖; 接著往下判斷,waitStartTime是否等於0,如果不等於,說明不是第一次來了,而是被喚醒後來到這裡,那麼就不能直接放到隊尾再休眠了,而是要放到隊首,防止長時間搶不到鎖; ![Group 5](https://img.luozhiyun.com/20201218225213.png) 下面這張圖是處於喚醒後的示意圖,如何被喚醒的可以直接到跳到解鎖部分看完再回來。 被喚醒一開始是需要判斷一下當前的starving狀態以及等待的時間如果超過了1ms,那麼會將starving設定為true; 接下來會有一個if判斷, 這裡有個細節,因為是被喚醒的,所以判斷前需要重新獲取一下鎖,如果當前不是飢餓模式,那麼會直接返回,然後重新進入到for迴圈中; 如果當前是處於飢餓模式,那麼會計算一下delta為加鎖,並且當前的goroutine是可以直接搶佔鎖的,所以需要將waiter減一,如果starving不為飢餓,或者等待時間沒有超過1ms,或者waiter只有一個了,那麼還需要將delta減去mutexStarving,表示退出飢餓模式; 最後通過AddInt32將state加上delta,這裡之所以可以直接加上,因為這時候state的mutexLocked值肯定為0,並且mutexStarving位肯定為1,並且在獲取鎖之前至少還有當前一個goroutine在等待佇列中,所以waiter可以直接減1。 ![Group 6](https://img.luozhiyun.com/20201218225217.png) ## 解鎖流程 ### fast path ```go func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } //返回一個state被減後的值 new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { //如果返回的state值不為0,那麼進入到unlockSlow中 m.unlockSlow(new) } } ``` 這裡主要就是AddInt32重新設定state的mutexLocked位為0,然後判斷新的state值是否不為0,不為0則呼叫unlockSlow方法。 ### unlockSlow ![Group 7](https://img.luozhiyun.com/20201218225221.png) unlockSlow方法裡面也分為正常模式和飢餓模式下的解鎖: ```go func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 正常模式 if new&mutexStarving == 0 { old := new for { // 如果沒有 waiter,或者已經有在處理的情況,直接返回 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // waiter 數減 1,mutexWoken 標誌設定上,通過 CAS 更新 state 的值 new = (old - 1<