Go中由WaitGroup引發對記憶體對齊思考
阿新 • • 發佈:2021-01-17
> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com
>
> 本文使用的go的原始碼時14.4
WaitGroup使用大家都會,但是其中是怎麼實現的我們也需要知道,這樣才能在專案中儘可能的避免由於不正確的使用引發的panic。並且本文也將寫一下記憶體對齊方面做一個解析,喜歡大家喜歡。
## WaitGroup介紹
WaitGroup 提供了三個方法:
```go
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
```
* Add,用來設定 WaitGroup 的計數值;
* Done,用來將 WaitGroup 的計數值減 1,其實就是呼叫了 Add(-1);
* Wait,呼叫這個方法的 goroutine 會一直阻塞,直到 WaitGroup 的計數值變為 0。
例子我就不舉了,網上是很多的,下面我們直接進入正題。
## 解析
```go
type noCopy struct{}
type WaitGroup struct {
// 避免複製使用的一個技巧,可以告訴vet工具違反了複製使用的規則
noCopy noCopy
// 一個複合值,用來表示waiter數、計數值、訊號量
state1 [3]uint32
}
// 獲取state的地址和訊號量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 如果地址是64bit對齊的,陣列前兩個元素做state,後一個元素做訊號量
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 如果地址是32bit對齊的,陣列後兩個元素用來做state,它可以用來做64bit的原子操作,第一個元素32bit用來做訊號量
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
```
這裡剛開始,WaitGroup就秀了一把肌肉,讓我們看看大牛是怎麼寫程式碼的,思考一個原子操作在不同架構平臺上是怎麼操作的,在看state方法裡面為什麼要這麼做之前,我們先來看看記憶體對齊。
### 記憶體對齊
在維基百科https://en.wikipedia.org/wiki/Data_structure_alignment上我們可以看到對於記憶體對齊的定義:
> A memory address *a* is said to be *n-byte aligned* when *a* is a multiple of *n* [bytes](https://en.wikipedia.org/wiki/Byte) (where *n* is a power of 2).
簡而言之,現在的CPU訪問記憶體的時候是一次性訪問多個bytes,比如32位架構一次訪問4bytes,該處理器只能從地址為4的倍數的記憶體開始讀取資料,所以要求資料在存放的時候首地址的值是4的倍數存放,者就是所謂的記憶體對齊。
由於找不到Go語言的對齊規則,我對照了一下C語言的記憶體對齊的規則,可以和Go語言匹配的上,所以先參照下面的規則。
記憶體對齊遵循下面三個原則:
1. 結構體變數的**起始地址**能夠被其最寬的成員大小整除;
2. 結構體每個成員相對於**起始地址的偏移**能夠被其**自身大小整除**,如果不能則在**前一個成員後面**補充位元組;
3. 結構體總體大小能夠**被最寬的成員的大小**整除,如不能則在**後面**補充位元組;
通過下面的例子來實操一下記憶體對齊:
在32位架構中,int8佔1byte,int32佔4bytes,int16佔2bytes。
```go
type A struct {
a int8
b int32
c int16
}
type B struct {
a int8
c int16
b int32
}
func main() {
fmt.Printf("arrange fields to reduce size:\n"+
"A align: %d, size: %d\n" ,
unsafe.Alignof(A{}), unsafe.Sizeof(A{}) )
fmt.Printf("arrange fields to reduce size:\n"+
"B align: %d, size: %d\n" ,
unsafe.Alignof(B{}), unsafe.Sizeof(B{}) )
}
//output:
//arrange fields to reduce size:
//A align: 4, size: 12
//arrange fields to reduce size:
//B align: 4, size: 8
```
下面以在32位的架構中執行為例子:
在32位架構的系統中預設的對齊大小是4bytes。
假設結構體A中a的起始地址為0x0000,能夠被最寬的資料成員大小4bytes(int32)整除,所以從0x0000開始存放佔用一個位元組即0x0000~0x0001;b是int32,佔4bytes,所以要滿足條件2,需要在a後面padding3個byte,從0x0004開始;c是int16,佔2bytes故從0x0008開始佔用兩個位元組,即0x0008~0x0009;此時整個結構體佔用的空間是0x0000~0x0009佔用10個位元組,10%4 != 0, 不滿足第三個原則,所以需要在後面補充兩個位元組,即最後記憶體對齊後佔用的空間是0x0000~0x000B,一共12個位元組。
同理,相比結構體B則要緊湊些:
### WaitGroup中state方法的記憶體對齊
在講之前需要注意的是noCopy是一個空的結構體,大小為0,不需要做記憶體對齊,所以大家在看的時候可以忽略這個欄位。
在WaitGroup裡面,使用了uint32的陣列來構造state1欄位,然後根據系統的位數的不同構造不同的返回值,下面我面先來說說怎麼通過sate1這個欄位構建waiter數、計數值、訊號量的。
首先`unsafe.Pointer`來獲取state1的地址值然後轉換成uintptr型別的,然後判斷一下這個地址值是否能被8整除,這裡通過地址 mod 8的方式來判斷地址是否是64位對齊。
因為有記憶體對齊的存在,在64位架構裡面WaitGroup結構體state1起始的位置肯定是64位對齊的,所以在64位架構上用state1前兩個元素併成uint64來表示statep,state1最後一個元素表示semap;
那麼64位架構上面獲取state1的時候能不能第一個元素表示semap,後兩個元素拼成64位返回呢?
答案自然是不可以,因為uint32的對齊保證是4bytes,64位架構中一次性處理事務的一個固定長度是8bytes,如果用state1的後兩個元素表示一個64位字的欄位的話CPU需要讀取記憶體兩次,不能保證原子性。
但是在32位架構裡面,一個字長是4bytes,要操作64位的資料分佈在**兩個資料塊**中,需要兩次操作才能完成訪問。如果兩次操作中間有可能別其他操作修改,不能保證原子性。
同理32位架構想要原子性的操作8bytes,需要由呼叫方保證其資料地址是64位對齊的,否則原子訪問會有異常,我們在這裡https://golang.org/pkg/sync/atomic/#pkg-note-BUG可以看到描述:
> On ARM, x86-32, and 32-bit MIPS, it is the caller's responsibility to arrange for 64-bit alignment of 64-bit words accessed atomically. The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64-bit aligned.
所以為了保證64位字對齊,只能讓變數或開闢的結構體、陣列和切片值中的第一個64位字可以被認為是64位字對齊。但是在使用WaitGroup的時候會有巢狀的情況,不能保證總是讓WaitGroup存在於結構體的第一個欄位上,所以我們需要增加填充使它能對齊64位字。
在32位架構中,WaitGroup在初始化的時候,分配記憶體地址的時候是隨機的,所以WaitGroup結構體state1起始的位置不一定是64位對齊,可能會是:`uintptr(unsafe.Pointer(&wg.state1))%8 = 4`,如果出現這樣的情況,那麼就需要用state1的第一個元素做padding,用state1的後兩個元素合併成uint64來表示statep。
#### 小結
這裡小結一下,因為為了完成上面的這篇內容實在是查閱了很多資料,才得出這樣的結果。所以這裡小結一下,在64位架構中,CPU每次操作的字長都是8bytes,編譯器會自動幫我們把結構體的第一個欄位的地址初始化成64位對齊的,所以64位架構上用state1前兩個元素併成uint64來表示statep,state1最後一個元素表示semap;
然後在32位架構中,在初始化WaitGroup的時候,編譯器只能保證32位對齊,不能保證64位對齊,所以通過`uintptr(unsafe.Pointer(&wg.state1))%8`判斷是否等於0來看state1記憶體地址是否是64位對齊,如果是,那麼也和64位架構一樣,用state1前兩個元素併成uint64來表示statep,state1最後一個元素表示semap,否則用state1的第一個元素做padding,用state1的後兩個元素合併成uint64來表示statep。
如果我說錯了,歡迎來diss我,我覺得我需要學習的地方還有很多。
### Add 方法
```go
func (wg *WaitGroup) Add(delta int) {
// 獲取狀態值
statep, semap := wg.state()
...
// 高32bit是計數值v,所以把delta左移32,增加到計數上
state := atomic.AddUint64(statep, uint64(delta)<<32)
// 獲取計數器的值
v := int32(state >> 32)
// 獲取waiter的值
w := uint32(state)
...
// 任務計數器不能為負數
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// wait不等於0說明已經執行了Wait,此時不容許Add
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 計數器的值大於或者沒有waiter在等待,直接返回
if v > 0 || w == 0 {
return
}
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 此時,counter一定等於0,而waiter一定大於0
// 先把counter置為0,再釋放waiter個數的訊號量
*statep = 0
for ; w != 0; w-- {
//釋放訊號量,執行一次釋放一個,喚醒一個等待者
runtime_Semrelease(semap, false, 0)
}
}
```
1. add方法首先會呼叫state方法獲取statep、semap的值。statep是一個uint64型別的值,高32位用來記錄add方法傳入的delta值之和;低32位用來表示呼叫wait方法等待的goroutine的數量,也就是waiter的數量。如下:
2. add方法會呼叫`atomic.AddUint64`方法將傳入的delta左移32位,也就是將counter加上delta的值;
3. 因為計數器counter可能為負數,所以int32來獲取計數器的值,waiter不可能為負數,所以使用uint32來獲取;
4. 接下來就是一系列的校驗,v不能小於零表示任務計數器不能為負數,否則會panic;w不等於,並且v的值等於delta表示wait方法先於add方法執行,此時也會panic,因為waitgroup不允許呼叫了Wait方法後還呼叫add方法;
5. v大於零或者w等於零直接返回,說明這個時候不需要釋放waiter,所以直接返回;
6. ` *statep != state`到了這個校驗這裡,狀態只能是waiter大於零並且counter為零。當waiter大於零的時候是不允許再呼叫add方法,counter為零的時候也不能呼叫wait方法,所以這裡使用state的值和記憶體的地址值進行比較,檢視是否呼叫了add或者wait導致state變動,如果有就是非法呼叫會引起panic;
7. 最後將statep值重置為零,然後釋放所有的waiter;
### Wait方法
```go
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
...
for {
state := atomic.LoadUint64(statep)
// 獲取counter
v := int32(state >> 32)
// 獲取waiter
w := uint32(state)
// counter為零,不需要等待直接返回
if v == 0 {
...
return
}
// 使用CAS將waiter加1
if atomic.CompareAndSwapUint64(statep, state, state+1) {
...
// 掛起等待喚醒
runtime_Semacquire(semap)
// 喚醒之後statep不為零,表示WaitGroup又被重複使用,這回panic
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
...
// 直接返回
return
}
}
}
```
1. Wait方法首先也是呼叫state方法獲取狀態值;
2. 進入for迴圈之後Load statep的值,然後分別獲取counter和counter;
3. 如果counter已經為零了,那麼直接返回不需要等待;
4. counter不為零,那麼使用CAS將waiter加1,由於CAS可能失敗,所以for迴圈會再次的回到這裡進行CAS,直到成功;
5. 呼叫runtime_Semacquire掛起等待喚醒;
6. `*statep != 0`喚醒之後statep不為零,表示WaitGroup又被重複使用,這會panic。需要注意的是waitgroup並不是不讓重用,而是不能在wait方法還沒執行完就開始重用。
### waitgroup使用小結
看完了waitgroup的add方法與wait方法,我們發現裡面有很多校驗,使用不當會導致panic,所以我們需要總結一下如何正確使用:
* 不能將計數器設定為負數,否則會發生panic;注意有兩種方式會導致計數器為負數,一是呼叫 Add 的時候傳遞一個負數,第二是呼叫 Done 方法的次數過多,超過了 WaitGroup 的計數值;
* 在使用 WaitGroup 的時候,一定要等所有的 Add 方法呼叫之後再呼叫 Wait,否則就可能導致 panic;
* wait還沒結束就重用 WaitGroup。WaitGroup是可以重用的,但是需要等上一批的goroutine 都呼叫wait完畢後才能繼續重用WaitGroup;
## 總結
waitgroup裡面的程式碼實際上是非常的簡單的,這篇文章主要是由waitgroup引入了記憶體對齊這個概念。由waitgroup帶我們看了在實際的程式碼中是如何利用記憶體對齊這個概念的,以及如何在32為作業系統中原子性的操作64位長的欄位。
除了記憶體對齊的概念以外通過原始碼我們也瞭解到了使用waitgroup的時候需要怎麼做才是符合規範的,不會引發panic。
## Reference
http://blog.newbmiao.com/2020/02/10/dig101-golang-struct-memory-align.html
https://gfw.go101.org/article/memory-layout.html
https://golang.org/pkg/sync/atomic/#pkg-note-BUG
https://en.wikipedia.org/wiki/Data_structure_alignment
https://www.zhihu.com/question/