1. 程式人生 > >Go中由WaitGroup引發對記憶體對齊思考

Go中由WaitGroup引發對記憶體對齊思考

> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com > > 本文使用的go的原始碼時14.4 WaitGroup使用大家都會,但是其中是怎麼實現的我們也需要知道,這樣才能在專案中儘可能的避免由於不正確的使用引發的panic。並且本文也將寫一下記憶體對齊方面做一個解析,喜歡大家喜歡。 ## WaitGroup介紹 WaitGroup 提供了三個方法: ```go func (wg *WaitGroup) Add(delta int) func (wg *WaitGroup) Done() func (wg *WaitGroup) Wait() ``` * Add,用來設定 WaitGroup 的計數值; * Done,用來將 WaitGroup 的計數值減 1,其實就是呼叫了 Add(-1); * Wait,呼叫這個方法的 goroutine 會一直阻塞,直到 WaitGroup 的計數值變為 0。 例子我就不舉了,網上是很多的,下面我們直接進入正題。 ## 解析 ```go type noCopy struct{} type WaitGroup struct { // 避免複製使用的一個技巧,可以告訴vet工具違反了複製使用的規則 noCopy noCopy // 一個複合值,用來表示waiter數、計數值、訊號量 state1 [3]uint32 } // 獲取state的地址和訊號量的地址 func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { // 如果地址是64bit對齊的,陣列前兩個元素做state,後一個元素做訊號量 return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] } else { // 如果地址是32bit對齊的,陣列後兩個元素用來做state,它可以用來做64bit的原子操作,第一個元素32bit用來做訊號量 return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] } } ``` 這裡剛開始,WaitGroup就秀了一把肌肉,讓我們看看大牛是怎麼寫程式碼的,思考一個原子操作在不同架構平臺上是怎麼操作的,在看state方法裡面為什麼要這麼做之前,我們先來看看記憶體對齊。 ### 記憶體對齊 在維基百科https://en.wikipedia.org/wiki/Data_structure_alignment上我們可以看到對於記憶體對齊的定義: > A memory address *a* is said to be *n-byte aligned* when *a* is a multiple of *n* [bytes](https://en.wikipedia.org/wiki/Byte) (where *n* is a power of 2). 簡而言之,現在的CPU訪問記憶體的時候是一次性訪問多個bytes,比如32位架構一次訪問4bytes,該處理器只能從地址為4的倍數的記憶體開始讀取資料,所以要求資料在存放的時候首地址的值是4的倍數存放,者就是所謂的記憶體對齊。 由於找不到Go語言的對齊規則,我對照了一下C語言的記憶體對齊的規則,可以和Go語言匹配的上,所以先參照下面的規則。 記憶體對齊遵循下面三個原則: 1. 結構體變數的**起始地址**能夠被其最寬的成員大小整除; 2. 結構體每個成員相對於**起始地址的偏移**能夠被其**自身大小整除**,如果不能則在**前一個成員後面**補充位元組; 3. 結構體總體大小能夠**被最寬的成員的大小**整除,如不能則在**後面**補充位元組; 通過下面的例子來實操一下記憶體對齊: 在32位架構中,int8佔1byte,int32佔4bytes,int16佔2bytes。 ```go type A struct { a int8 b int32 c int16 } type B struct { a int8 c int16 b int32 } func main() { fmt.Printf("arrange fields to reduce size:\n"+ "A align: %d, size: %d\n" , unsafe.Alignof(A{}), unsafe.Sizeof(A{}) ) fmt.Printf("arrange fields to reduce size:\n"+ "B align: %d, size: %d\n" , unsafe.Alignof(B{}), unsafe.Sizeof(B{}) ) } //output: //arrange fields to reduce size: //A align: 4, size: 12 //arrange fields to reduce size: //B align: 4, size: 8 ``` 下面以在32位的架構中執行為例子: 在32位架構的系統中預設的對齊大小是4bytes。 假設結構體A中a的起始地址為0x0000,能夠被最寬的資料成員大小4bytes(int32)整除,所以從0x0000開始存放佔用一個位元組即0x0000~0x0001;b是int32,佔4bytes,所以要滿足條件2,需要在a後面padding3個byte,從0x0004開始;c是int16,佔2bytes故從0x0008開始佔用兩個位元組,即0x0008~0x0009;此時整個結構體佔用的空間是0x0000~0x0009佔用10個位元組,10%4 != 0, 不滿足第三個原則,所以需要在後面補充兩個位元組,即最後記憶體對齊後佔用的空間是0x0000~0x000B,一共12個位元組。
同理,相比結構體B則要緊湊些: ### WaitGroup中state方法的記憶體對齊 在講之前需要注意的是noCopy是一個空的結構體,大小為0,不需要做記憶體對齊,所以大家在看的時候可以忽略這個欄位。 在WaitGroup裡面,使用了uint32的陣列來構造state1欄位,然後根據系統的位數的不同構造不同的返回值,下面我面先來說說怎麼通過sate1這個欄位構建waiter數、計數值、訊號量的。 首先`unsafe.Pointer`來獲取state1的地址值然後轉換成uintptr型別的,然後判斷一下這個地址值是否能被8整除,這裡通過地址 mod 8的方式來判斷地址是否是64位對齊。 因為有記憶體對齊的存在,在64位架構裡面WaitGroup結構體state1起始的位置肯定是64位對齊的,所以在64位架構上用state1前兩個元素併成uint64來表示statep,state1最後一個元素表示semap; 那麼64位架構上面獲取state1的時候能不能第一個元素表示semap,後兩個元素拼成64位返回呢? 答案自然是不可以,因為uint32的對齊保證是4bytes,64位架構中一次性處理事務的一個固定長度是8bytes,如果用state1的後兩個元素表示一個64位字的欄位的話CPU需要讀取記憶體兩次,不能保證原子性。 但是在32位架構裡面,一個字長是4bytes,要操作64位的資料分佈在**兩個資料塊**中,需要兩次操作才能完成訪問。如果兩次操作中間有可能別其他操作修改,不能保證原子性。 同理32位架構想要原子性的操作8bytes,需要由呼叫方保證其資料地址是64位對齊的,否則原子訪問會有異常,我們在這裡https://golang.org/pkg/sync/atomic/#pkg-note-BUG可以看到描述: >
On ARM, x86-32, and 32-bit MIPS, it is the caller's responsibility to arrange for 64-bit alignment of 64-bit words accessed atomically. The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64-bit aligned. 所以為了保證64位字對齊,只能讓變數或開闢的結構體、陣列和切片值中的第一個64位字可以被認為是64位字對齊。但是在使用WaitGroup的時候會有巢狀的情況,不能保證總是讓WaitGroup存在於結構體的第一個欄位上,所以我們需要增加填充使它能對齊64位字。 在32位架構中,WaitGroup在初始化的時候,分配記憶體地址的時候是隨機的,所以WaitGroup結構體state1起始的位置不一定是64位對齊,可能會是:`uintptr(unsafe.Pointer(&wg.state1))%8 = 4`,如果出現這樣的情況,那麼就需要用state1的第一個元素做padding,用state1的後兩個元素合併成uint64來表示statep。 #### 小結 這裡小結一下,因為為了完成上面的這篇內容實在是查閱了很多資料,才得出這樣的結果。所以這裡小結一下,在64位架構中,CPU每次操作的字長都是8bytes,編譯器會自動幫我們把結構體的第一個欄位的地址初始化成64位對齊的,所以64位架構上用state1前兩個元素併成uint64來表示statep,state1最後一個元素表示semap; 然後在32位架構中,在初始化WaitGroup的時候,編譯器只能保證32位對齊,不能保證64位對齊,所以通過`uintptr(unsafe.Pointer(&wg.state1))%8`判斷是否等於0來看state1記憶體地址是否是64位對齊,如果是,那麼也和64位架構一樣,用state1前兩個元素併成uint64來表示statep,state1最後一個元素表示semap,否則用state1的第一個元素做padding,用state1的後兩個元素合併成uint64來表示statep。 如果我說錯了,歡迎來diss我,我覺得我需要學習的地方還有很多。
### Add 方法 ```go func (wg *WaitGroup) Add(delta int) { // 獲取狀態值 statep, semap := wg.state() ... // 高32bit是計數值v,所以把delta左移32,增加到計數上 state := atomic.AddUint64(statep, uint64(delta)<<32) // 獲取計數器的值 v := int32(state >> 32) // 獲取waiter的值 w := uint32(state) ... // 任務計數器不能為負數 if v < 0 { panic("sync: negative WaitGroup counter") } // wait不等於0說明已經執行了Wait,此時不容許Add if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 計數器的值大於或者沒有waiter在等待,直接返回 if v > 0 || w == 0 { return } if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 此時,counter一定等於0,而waiter一定大於0 // 先把counter置為0,再釋放waiter個數的訊號量 *statep = 0 for ; w != 0; w-- { //釋放訊號量,執行一次釋放一個,喚醒一個等待者 runtime_Semrelease(semap, false, 0) } } ``` 1. add方法首先會呼叫state方法獲取statep、semap的值。statep是一個uint64型別的值,高32位用來記錄add方法傳入的delta值之和;低32位用來表示呼叫wait方法等待的goroutine的數量,也就是waiter的數量。如下: 2. add方法會呼叫`atomic.AddUint64`方法將傳入的delta左移32位,也就是將counter加上delta的值; 3. 因為計數器counter可能為負數,所以int32來獲取計數器的值,waiter不可能為負數,所以使用uint32來獲取; 4. 接下來就是一系列的校驗,v不能小於零表示任務計數器不能為負數,否則會panic;w不等於,並且v的值等於delta表示wait方法先於add方法執行,此時也會panic,因為waitgroup不允許呼叫了Wait方法後還呼叫add方法; 5. v大於零或者w等於零直接返回,說明這個時候不需要釋放waiter,所以直接返回; 6. ` *statep != state`到了這個校驗這裡,狀態只能是waiter大於零並且counter為零。當waiter大於零的時候是不允許再呼叫add方法,counter為零的時候也不能呼叫wait方法,所以這裡使用state的值和記憶體的地址值進行比較,檢視是否呼叫了add或者wait導致state變動,如果有就是非法呼叫會引起panic; 7. 最後將statep值重置為零,然後釋放所有的waiter; ### Wait方法 ```go func (wg *WaitGroup) Wait() { statep, semap := wg.state() ... for { state := atomic.LoadUint64(statep) // 獲取counter v := int32(state >> 32) // 獲取waiter w := uint32(state) // counter為零,不需要等待直接返回 if v == 0 { ... return } // 使用CAS將waiter加1 if atomic.CompareAndSwapUint64(statep, state, state+1) { ... // 掛起等待喚醒 runtime_Semacquire(semap) // 喚醒之後statep不為零,表示WaitGroup又被重複使用,這回panic if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } ... // 直接返回 return } } } ``` 1. Wait方法首先也是呼叫state方法獲取狀態值; 2. 進入for迴圈之後Load statep的值,然後分別獲取counter和counter; 3. 如果counter已經為零了,那麼直接返回不需要等待; 4. counter不為零,那麼使用CAS將waiter加1,由於CAS可能失敗,所以for迴圈會再次的回到這裡進行CAS,直到成功; 5. 呼叫runtime_Semacquire掛起等待喚醒; 6. `*statep != 0`喚醒之後statep不為零,表示WaitGroup又被重複使用,這會panic。需要注意的是waitgroup並不是不讓重用,而是不能在wait方法還沒執行完就開始重用。 ### waitgroup使用小結 看完了waitgroup的add方法與wait方法,我們發現裡面有很多校驗,使用不當會導致panic,所以我們需要總結一下如何正確使用: * 不能將計數器設定為負數,否則會發生panic;注意有兩種方式會導致計數器為負數,一是呼叫 Add 的時候傳遞一個負數,第二是呼叫 Done 方法的次數過多,超過了 WaitGroup 的計數值; * 在使用 WaitGroup 的時候,一定要等所有的 Add 方法呼叫之後再呼叫 Wait,否則就可能導致 panic; * wait還沒結束就重用 WaitGroup。WaitGroup是可以重用的,但是需要等上一批的goroutine 都呼叫wait完畢後才能繼續重用WaitGroup; ## 總結 waitgroup裡面的程式碼實際上是非常的簡單的,這篇文章主要是由waitgroup引入了記憶體對齊這個概念。由waitgroup帶我們看了在實際的程式碼中是如何利用記憶體對齊這個概念的,以及如何在32為作業系統中原子性的操作64位長的欄位。 除了記憶體對齊的概念以外通過原始碼我們也瞭解到了使用waitgroup的時候需要怎麼做才是符合規範的,不會引發panic。 ## Reference http://blog.newbmiao.com/2020/02/10/dig101-golang-struct-memory-align.html https://gfw.go101.org/article/memory-layout.html https://golang.org/pkg/sync/atomic/#pkg-note-BUG https://en.wikipedia.org/wiki/Data_structure_alignment https://www.zhihu.com/question/