多圖詳解Go的互斥鎖Mutex
阿新 • • 發佈:2020-12-19
> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com
>
> 本文使用的go的原始碼時14.4
## Mutex介紹
Mutex 結構體包含兩個欄位:
* 欄位state:表示當前互斥鎖的狀態。
* 欄位 sema:是個訊號量變數,用來控制等待 goroutine 的阻塞休眠和喚醒。
```go
type Mutex struct {
state int32
sema uint32
}
```
在Go的1.9版本中,為了解決等待中的 goroutine 可能會一直獲取不到鎖,增加了飢餓模式,讓鎖變得更公平,不公平的等待時間限制在 1 毫秒。
state狀態欄位所表示的含義較為複雜,如下圖所示,最低三位分別表示mutexLocked、mutexWoken、mutexStarving,state總共是32位長度,所以剩下的位置,用來表示可以有1<<(32-3)個Goroutine 等待互斥鎖的釋放:
![Group 1](https://img.luozhiyun.com/20201218225206.png)
程式碼表示如下:
```go
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
)
```
## 加鎖流程
### fast path
```go
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
m.lockSlow()
}
```
加鎖的時候,一開始會通過CAS看一下能不能直接獲取鎖,如果可以的話,那麼直接獲取鎖成功。
### lockSlow
```go
// 等待時間
var waitStartTime int64
// 飢餓標記
starving := false
// 喚醒標記
awoke := false
// 自旋次數
iter := 0
// 當前的鎖的狀態
old := m.state
for {
// 鎖是非飢餓狀態,鎖還沒被釋放,嘗試自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old> >mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 自旋
runtime_doSpin()
// 自旋次數加1
iter++
// 設定當前鎖的狀態
old = m.state
continue
}
...
}
```
進入到lockSlow方法之後首先會判斷以下能否可以自旋,判斷依據就是通過計算:
```
old&(mutexLocked|mutexStarving) == mutexLocked
```
可以知道當前鎖的狀態必須是上鎖,並且不能處於飢餓狀態,這個判斷才為true,然後再看看iter是否滿足次數的限制,如果都為true,那麼則往下繼續。
內層if包含了四個判斷:
* 首先判斷了awoke是不是喚醒狀態;
* `old&mutexWoken == 0`為真表示沒有其他正在喚醒的節點;
* `old> >mutexWaiterShift != 0`表明當前有正在等待的goroutine;
* CAS將state的mutexWoken狀態位設定為`old|mutexWoken`,即為1是否成功。
如果都滿足,那麼將awoke狀態設定為真,然後將自旋次數加一,並重新設定狀態。
繼續往下看:
```go
new := old
if old&mutexStarving == 0 {
// 如果當前不是飢餓模式,那麼將mutexLocked狀態位設定1,表示加鎖
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
// 如果當前被鎖定或者處於飢餓模式,則waiter加一,表示等待一個等待計數
new += 1 << mutexWaiterShift
}
// 如果是飢餓狀態,並且已經上鎖了,那麼mutexStarving狀態位設定為1,設定為飢餓狀態
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// awoke為true則表明當前執行緒在上面自旋的時候,修改mutexWoken狀態成功
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 清除喚醒標誌位
new &^= mutexWoken
}
```
走到這裡有兩種情況:1. 自旋超過了次數;2. 目前鎖沒有被持有。
所以第一個判斷,如果當前加了鎖,但是沒有處於飢餓狀態,也會重複設定`new |= mutexLocked`,即將mutexLocked狀態設定為1;
如果是old已經是飢餓狀態或者已經被上鎖了,那麼需要設定Waiter加一,表示這個goroutine下面不會獲取鎖,會等待;
如果starving為真,表示當前goroutine是飢餓狀態,並且old已經被上鎖了,那麼設定`new |= mutexStarving`,即將mutexStarving狀態位設定為1;
awoke如果在自旋時設定成功,那麼在這裡要`new &^= mutexWoken`消除mutexWoken標誌位。因為後續流程很有可能當前執行緒會被掛起,就需要等待其他釋放鎖的goroutine來喚醒,如果unlock的時候發現mutexWoken的位置不是0,則就不會去喚醒,則該執行緒就無法再醒來加鎖。
繼續往下:
```go
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 1.如果原來狀態沒有上鎖,也沒有飢餓,那麼直接返回,表示獲取到鎖
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 2.到這裡是沒有獲取到鎖,判斷一下等待時長是否不為0
// 如果不為0,那麼加入到佇列頭部
queueLifo := waitStartTime != 0
// 3.如果等待時間為0,那麼初始化等待時間
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 4.阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 5.喚醒之後檢查鎖是否應該處於飢餓狀態
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 6.判斷是否已經處於飢餓狀態
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 7.加鎖並且將waiter數減1
delta := int32(mutexLocked - 1<>mutexWaiterShift == 1 {
// 8.如果當前goroutine不是飢餓狀態,就從飢餓模式切換會正常模式
delta -= mutexStarving
}
// 9.設定狀態
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
```
到這裡,首先會CAS設定新的狀態,如果設定成功則往下走,否則返回之後迴圈設定狀態。設定成功之後:
1. 首先會判斷old狀態,如果沒有飢餓,也沒有獲取到鎖,那麼直接返回,因為這種情況在進入到這段程式碼之前會將new狀態設定為mutexLocked,表示已經獲取到鎖。這裡還判斷了一下old狀態不能為飢餓狀態,否則也不能獲取到鎖;
2. 判斷waitStartTime是否已經初始化過了,如果是新的goroutine來搶佔鎖,那麼queueLifo會返回false;如果不是新的goroutine來搶佔鎖,那麼加入到等待佇列頭部,這樣等待最久的 goroutine 優先能夠獲取到鎖;
3. 如果等待時間為0,那麼初始化等待時間;
4. 阻塞等待,當前goroutine進行休眠;
5. 喚醒之後檢查鎖是否應該處於飢餓狀態,並設定starving變數值;
6. 判斷是否已經處於飢餓狀態,如果不處於飢餓狀態,那麼這裡直接進入到下一個for迴圈中獲取鎖;
7. 加鎖並且將waiter數減1,這裡我看了一會,沒用懂什麼意思,其實需要分兩步來理解,相當於state+mutexLocked,然後state再將waiter部分的數減一;
8. 如果當前goroutine不是飢餓狀態或者waiter只有一個,就從飢餓模式切換會正常模式;
9. 設定狀態;
下面用圖例來解釋:
這部分的圖解是休眠前的操作,休眠前會根據old的狀態來判斷能不能直接獲取到鎖,如果old狀態沒有上鎖,也沒有飢餓,那麼直接break返回,因為這種情況會在CAS中設定加上鎖;
接著往下判斷,waitStartTime是否等於0,如果不等於,說明不是第一次來了,而是被喚醒後來到這裡,那麼就不能直接放到隊尾再休眠了,而是要放到隊首,防止長時間搶不到鎖;
![Group 5](https://img.luozhiyun.com/20201218225213.png)
下面這張圖是處於喚醒後的示意圖,如何被喚醒的可以直接到跳到解鎖部分看完再回來。
被喚醒一開始是需要判斷一下當前的starving狀態以及等待的時間如果超過了1ms,那麼會將starving設定為true;
接下來會有一個if判斷, 這裡有個細節,因為是被喚醒的,所以判斷前需要重新獲取一下鎖,如果當前不是飢餓模式,那麼會直接返回,然後重新進入到for迴圈中;
如果當前是處於飢餓模式,那麼會計算一下delta為加鎖,並且當前的goroutine是可以直接搶佔鎖的,所以需要將waiter減一,如果starving不為飢餓,或者等待時間沒有超過1ms,或者waiter只有一個了,那麼還需要將delta減去mutexStarving,表示退出飢餓模式;
最後通過AddInt32將state加上delta,這裡之所以可以直接加上,因為這時候state的mutexLocked值肯定為0,並且mutexStarving位肯定為1,並且在獲取鎖之前至少還有當前一個goroutine在等待佇列中,所以waiter可以直接減1。
![Group 6](https://img.luozhiyun.com/20201218225217.png)
## 解鎖流程
### fast path
```go
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
//返回一個state被減後的值
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
//如果返回的state值不為0,那麼進入到unlockSlow中
m.unlockSlow(new)
}
}
```
這裡主要就是AddInt32重新設定state的mutexLocked位為0,然後判斷新的state值是否不為0,不為0則呼叫unlockSlow方法。
### unlockSlow
![Group 7](https://img.luozhiyun.com/20201218225221.png)
unlockSlow方法裡面也分為正常模式和飢餓模式下的解鎖:
```go
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 正常模式
if new&mutexStarving == 0 {
old := new
for {
// 如果沒有 waiter,或者已經有在處理的情況,直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// waiter 數減 1,mutexWoken 標誌設定上,通過 CAS 更新 state 的值
new = (old - 1<