go中semaphore(訊號量)原始碼解讀
阿新 • • 發佈:2021-04-02
- [執行時訊號量機制 semaphore](#%E8%BF%90%E8%A1%8C%E6%97%B6%E4%BF%A1%E5%8F%B7%E9%87%8F%E6%9C%BA%E5%88%B6-semaphore)
- [前言](#%E5%89%8D%E8%A8%80)
- [作用是什麼](#%E4%BD%9C%E7%94%A8%E6%98%AF%E4%BB%80%E4%B9%88)
- [幾個主要的方法](#%E5%87%A0%E4%B8%AA%E4%B8%BB%E8%A6%81%E7%9A%84%E6%96%B9%E6%B3%95)
- [如何實現](#%E5%A6%82%E4%BD%95%E5%AE%9E%E7%8E%B0)
- [sudog 快取](#sudog-%E7%BC%93%E5%AD%98)
- [acquireSudog](#acquiresudog)
- [releaseSudog](#releasesudog)
- [semaphore](#semaphore)
- [poll_runtime_Semacquire/sync_runtime_SemacquireMutex](#poll_runtime_semacquiresync_runtime_semacquiremutex)
- [sync_runtime_Semrelease](#sync_runtime_semrelease)
- [參考](#%E5%8F%82%E8%80%83)
## 執行時訊號量機制 semaphore
### 前言
最近在看原始碼,發現好多地方用到了這個`semaphore`。
本文是在`go version go1.13.15 darwin/amd64`上進行的
### 作用是什麼
下面是官方的描述
```go
// Semaphore implementation exposed to Go.
// Intended use is provide a sleep and wakeup
// primitive that can be used in the contended case
// of other synchronization primitives.
// Thus it targets the same goal as Linux's futex,
// but it has much simpler semantics.
//
// That is, don't think of these as semaphores.
// Think of them as a way to implement sleep and wakeup
// such that every sleep is paired with a single wakeup,
// even if, due to races, the wakeup happens before the sleep.
// 具體的用法是提供 sleep 和 wakeup 原語
// 以使其能夠在其它同步原語中的競爭情況下使用
// 因此這裡的 semaphore 和 Linux 中的 futex 目標是一致的
// 只不過語義上更簡單一些
//
// 也就是說,不要認為這些是訊號量
// 把這裡的東西看作 sleep 和 wakeup 實現的一種方式
// 每一個 sleep 都會和一個 wakeup 配對
// 即使在發生 race 時,wakeup 在 sleep 之前時也是如此
```
上面提到了和`futex`作用一樣,關於`futex`
> futex(快速使用者區互斥的簡稱)是一個在Linux上實現鎖定和構建高階抽象鎖如訊號量和POSIX互斥的基本工具
> Futex 由一塊能夠被多個程序共享的記憶體空間(一個對齊後的整型變數)組成;這個整型變數的值能夠通過組合語言呼叫CPU提供的原子操作指令來增加或減少,並且一個程序可以等待直到那個值變成正數。Futex 的操作幾乎全部在使用者空間完成;只有當操作結果不一致從而需要仲裁時,才需要進入作業系統核心空間執行。這種機制允許使用 futex 的鎖定原語有非常高的執行效率:由於絕大多數的操作並不需要在多個程序之間進行仲裁,所以絕大多數操作都可以在應用程式空間執行,而不需要使用(相對高代價的)核心系統呼叫。
go中的`semaphore`作用和`futex`目標一樣,提供`sleep`和`wakeup`原語,使其能夠在其它同步原語中的競爭情況下使用。當一個`goroutine`需要休眠時,將其進行集中存放,當需要`wakeup`時,再將其取出,重新放入排程器中。
例如在讀寫鎖的實現中,讀鎖和寫鎖之前的相互阻塞喚醒,就是通過`sleep`和`wakeup`實現,當有讀鎖存在的時候,新加入的寫鎖通過`semaphore`阻塞自己,當前面的讀鎖完成,在通過`semaphore`喚醒被阻塞的寫鎖。
寫鎖
```go
// 獲取互斥鎖
// 阻塞等待所有讀操作結束(如果有的話)
func (rw *RWMutex) Lock() {
...
// 原子的修改readerCount的值,直接將readerCount減去rwmutexMaxReaders
// 說明,有寫鎖進來了,這在上面的讀鎖中也有體現
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 當r不為0說明,當前寫鎖之前有讀鎖的存在
// 修改下readerWait,也就是當前寫鎖需要等待的讀鎖的個數
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// 阻塞當前寫鎖
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
...
}
```
通過`runtime_SemacquireMutex`對當前寫鎖進行`sleep`
讀鎖釋放
```go
// 減少讀操作計數,即readerCount--
// 喚醒等待寫操作的協程(如果有的話)
func (rw *RWMutex) RUnlock() {
...
// 首先通過atomic的原子性使readerCount-1
// 1.若readerCount大於0, 證明當前還有讀鎖, 直接結束本次操作
// 2.若readerCount小於0, 證明已經沒有讀鎖, 但是還有因為讀鎖被阻塞的寫鎖存在
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// 嘗試喚醒被阻塞的寫鎖
rw.rUnlockSlow(r)
}
...
}
func (rw *RWMutex) rUnlockSlow(r int32) {
...
// readerWait--操作,如果readerWait--操作之後的值為0,說明,寫鎖之前,已經沒有讀鎖了
// 通過writerSem訊號量,喚醒佇列中第一個阻塞的寫鎖
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// 喚醒一個寫鎖
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
```
寫鎖處理完之後,呼叫`runtime_Semrelease`來喚醒`sleep`的寫鎖
### 幾個主要的方法
在`go/src/sync/runtime.go`中,定義了這幾個方法
```go
// Semacquire等待*s > 0,然後原子遞減它。
// 它是一個簡單的睡眠原語,用於同步
// library and不應該直接使用。
func runtime_Semacquire(s *uint32)
// SemacquireMutex類似於Semacquire,用來阻塞互斥的物件
// 如果lifo為true,waiter將會被插入到佇列的頭部
// skipframes是跟蹤過程中要省略的幀數,從這裡開始計算
// runtime_SemacquireMutex's caller.
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
// Semrelease會自動增加*s並通知一個被Semacquire阻塞的等待的goroutine
// 它是一個簡單的喚醒原語,用於同步
// library and不應該直接使用。
// 如果handoff為true, 傳遞訊號到佇列頭部的waiter
// skipframes是跟蹤過程中要省略的幀數,從這裡開始計算
// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
```
具體的實現是在`go/src/runtime/sema.go`中
```go
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0)
}
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}
```
### 如何實現
### sudog 快取
`semaphore`的實現使用到了`sudog`,我們先來看下
sudog 是執行時用來存放處於阻塞狀態的`goroutine`的一個上層抽象,是用來實現使用者態訊號量的主要機制之一。 例如當一個`goroutine`因為等待`channel`的資料需要進行阻塞時,`sudog`會將`goroutine`及其用於等待資料的位置進行記錄, 並進而串聯成一個等待佇列,或二叉平衡樹。
```go
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
// 以下欄位受hchan保護
g *g
// isSelect 表示 g 正在參與一個 select, so
// 因此 g.selectDone 必須以 CAS 的方式來獲取wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // 資料元素(可能指向棧)
// 以下欄位不會併發訪問。
// 對於通道,waitlink只被g訪問。
// 對於訊號量,所有欄位(包括上面的欄位)
// 只有當持有一個semroot鎖時才被訪問。
acquiretime int64
releasetime int64
ticket uint32
parent *sudog //semaRoot 二叉樹
waitlink *sudog // g.waiting 列表或 semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
```
`sudog`的獲取和歸還,遵循以下策略:
1、獲取,首先從`per-P`快取獲取,對於`per-P`快取,如果`per-P`快取為空,則從全域性池抓取一半,然後取出`per-P`快取中的最後一個;
2、歸還,歸還到`per-P`快取,如果`per-P`快取滿了,就把`per-P`快取的一半歸還到全域性快取中,然後歸還`sudog`到`per-P`快取中。
#### acquireSudog
1、如果`per-P`快取的內容沒達到長度的一般,則會從全域性額快取中抓取一半;
2、然後返回把`per-P`快取中最後一個`sudog`返回,並且置空;
```go
// go/src/runtime/proc.go
//go:nosplit
func acquireSudog() *sudog {
// Delicate dance: 訊號量的實現呼叫acquireSudog,然後acquireSudog呼叫new(sudog)
// new呼叫malloc, malloc呼叫垃圾收集器,垃圾收集器在stopTheWorld呼叫訊號量
// 通過在new(sudog)周圍執行acquirem/releasem來打破迴圈
// acquirem/releasem在new(sudog)期間增加m.locks,防止垃圾收集器被呼叫。
// 獲取當前 g 所在的 m
mp := acquirem()
// 獲取p的指標
pp := mp.p.ptr()
if len(pp.sudogcache) == 0 {
lock(&sched.sudoglock)
// 首先,嘗試從中央快取獲取一批資料。
for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
s := sched.sudogcache
sched.sudogcache = s.next
s.next = nil
pp.sudogcache = append(pp.sudogcache, s)
}
unlock(&sched.sudoglock)
// 如果中央快取中沒有,新分配
if len(pp.sudogcache) == 0 {
pp.sudogcache = append(pp.sudogcache, new(sudog))
}
}
// 取快取中最後一個
n := len(pp.sudogcache)
s := pp.sudogcache[n-1]
pp.sudogcache[n-1] = nil
// 將剛取出的在快取中移除
pp.sudogcache = pp.sudogcache[:n-1]
if s.elem != nil {
throw("acquireSudog: found s.elem != nil in cache")
}
releasem(mp)
return s
}
```
#### releaseSudog
1、如果`per-P`快取滿了,就歸還`per-P`快取一般的內容到全域性快取;
2、然後將回收的`sudog`放到`per-P`快取中。
```go
// go/src/runtime/proc.go
//go:nosplit
func releaseSudog(s *sudog) {
if s.elem != nil {
throw("runtime: sudog with non-nil elem")
}
if s.isSelect {
throw("runtime: sudog with non-false isSelect")
}
if s.next != nil {
throw("runtime: sudog with non-nil next")
}
if s.prev != nil {
throw("runtime: sudog with non-nil prev")
}
if s.waitlink != nil {
throw("runtime: sudog with non-nil waitlink")
}
if s.c != nil {
throw("runtime: sudog with non-nil c")
}
gp := getg()
if gp.param != nil {
throw("runtime: releaseSudog with non-nil gp.param")
}
// 避免重新安排到另一個P
mp := acquirem() // avoid rescheduling to another P
pp := mp.p.ptr()
// 如果快取滿了
if len(pp.sudogcache) == cap(pp.sudogcache) {
// 將本地快取記憶體的一半傳輸到中央快取記憶體
var first, last *sudog
for len(pp.sudogcache) > cap(pp.sudogcache)/2 {
n := len(pp.sudogcache)
p := pp.sudogcache[n-1]
pp.sudogcache[n-1] = nil
pp.sudogcache = pp.sudogcache[:n-1]
if first == nil {
first = p
} else {
last.next = p
}
last = p
}
lock(&sched.sudoglock)
last.next = sched.sudogcache
sched.sudogcache = first
unlock(&sched.sudoglock)
}
// 歸還sudog到`per-P`快取中
pp.sudogcache = append(pp.sudogcache, s)
releasem(mp)
}
```
### semaphore
```go
// go/src/runtime/sema.go
// 用於sync.Mutex的非同步訊號量。
// semaRoot擁有一個具有不同地址(s.elem)的sudog平衡樹。
// 每個sudog都可以依次(通過s.waitlink)指向一個列表,在相同地址上等待的其他sudog。
// 對具有相同地址的sudog內部列表進行的操作全部為O(1)。頂層semaRoot列表的掃描為O(log n),
// 其中,n是阻止goroutines的不同地址的數量,通過他們雜湊到給定的semaRoot。
type semaRoot struct {
lock mutex
// waiters的平衡樹的根節點
treap *sudog
// waiters的數量,讀取的時候無所
nwait uint32
}
// Prime to not correlate with any user patterns.
const semTabSize = 251
var semtable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}
```
#### poll_runtime_Semacquire/sync_runtime_SemacquireMutex
```go
// go/src/runtime/sema.go
//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
func poll_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0)
}
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
// 判斷這個goroutine,是否是m上正在執行的那個
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
// *addr -= 1
if cansemacquire(addr) {
return
}
// 增加等待計數
// 再試一次 cansemacquire 如果成功則直接返回
// 將自己作為等待者入隊
// 休眠
// (等待器描述符由出隊訊號產生出隊行為)
// 獲取一個sudog
s := acquireSudog()
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lock(&root.lock)
// 新增我們自己到nwait來禁用semrelease中的"easy case"
atomic.Xadd(&root.nwait, 1)
// 檢查cansemacquire避免錯過喚醒
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// 任何在 cansemacquire 之後的 semrelease 都知道我們在等待(因為設定了 nwait),因此休眠
// 佇列將s新增到semaRoot中被阻止的goroutine中
root.queue(addr, s, lifo)
// 將當前goroutine置於等待狀態並解鎖鎖。
// 通過呼叫goready(gp),可以使goroutine再次可執行。
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
// 歸還sudog
releaseSudog(s)
}
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}
```
#### sync_runtime_Semrelease
```go
// go/src/runtime/sema.go
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}
func semrelease1(addr *uint32, handoff bool, skipframes int) {
root := semroot(addr)
atomic.Xadd(addr, 1)
// Easy case:沒有等待者
// 這個檢查必須發生在xadd之後,以避免錯過喚醒
if atomic.Load(&root.nwait) == 0 {
return
}
// Harder case: 找到等待者,並且喚醒
lock(&root.lock)
if atomic.Load(&root.nwait) == 0 {
// 該計數已被另一個goroutine佔用,
// 因此無需喚醒其他goroutine。
unlock(&root.lock)
return
}
// 搜尋一個等待著然後將其喚醒
s, t0 := root.dequeue(addr)
if s != nil {
atomic.Xadd(&root.nwait, -1)
}
unlock(&root.lock)
if s != nil { // 可能會很慢,因此先解鎖
acquiretime := s.acquiretime
if acquiretime != 0 {
mutexevent(t0-acquiretime, 3+skipframes)
}
if s.ticket != 0 {
throw("corrupted semaphore ticket")
}
if handoff && cansemacquire(addr) {
s.ticket = 1
}
// goready(s.g, 5)
// 標記 runnable,等待被重新排程
readyWithTime(s, 5+skipframes)
}
}
```
摘自"同步原語"的一段總結
> 這一對 semacquire 和 semrelease 理解上可能不太直觀。 首先,我們必須意識到這兩個函式一定是在兩個不同的 M(執行緒)上得到執行,否則不會出現併發,我們不妨設為 M1 和 M2。 當 M1 上的 G1 執行到 semacquire1 時,如果快速路徑成功,則說明 G1 搶到鎖,能夠繼續執行。但一旦失敗且在慢速路徑下 依然搶不到鎖,則會進入 goparkunlock,將當前的 G1 放到等待佇列中,進而讓 M1 切換並執行其他 G。 當 M2 上的 G2 開始呼叫 semrelease1 時,只是單純的將等待佇列的 G1 重新放到排程佇列中,而當 G1 重新被排程時(假設運氣好又在 M1 上被排程),程式碼仍然會從 goparkunlock 之後開始執行,並再次嘗試競爭訊號量,如果成功,則會歸還 sudog。
### 參考
【同步原語】https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/sync/
【Go併發程式設計實戰--訊號量的使用方法和其實現原理】https://juejin.cn/post/6906677772479889422
【Semaphore】https://github.com/cch123/golang-notes/blob/master/semaphore.md
【程序同步之訊號量機制(pv操作)及三個經典同步問題】https://blog.csdn.net/SpeedMe/article/details/17597373
>**本文作者**:liz
>**本文連結**:https://boilingfrog.github.io/2021/04/02/semaphore/
>**版權宣告**:本文為博主原創文章,遵循 [CC 4.0 BY-SA](https://creativecommons.org/licenses/by-sa/4.0/) 版權協議,轉載請附上原文出處連結和本宣告