Golang sync.WaitGroup 原始碼分析
阿新 • • 發佈:2021-11-27
結構
// WaitGroup型別的資料不可以被複制 type WaitGroup struct { noCopy noCopy // 用來禁止當前結構的型別複製 // state1 是 64-bit變數: // 高32位是計數器counter,也就是活躍的g的個數 // 低32位表示因執行Wait()而阻塞的g的數量,即waiters // state2 表示sema訊號量,說明本章程式碼用到了原語 // 64-bit的原子操作需要64-bit的對齊,但是32位的編譯器只能保證64-bit欄位是32位對齊 // 因此,在32位架構上,我們需要在state()中檢查state1是否對齊, // 並在需要時動態地 "交換 "欄位順序。 state1 uint64 state2 uint32 }
Add
Add
func (wg *WaitGroup) Add(delta int) { // 獲取state1 和 state2 statep, semap := wg.state() // 競爭檢測程式碼不看 if race.Enabled { _ = *statep // trigger nil deref early if delta < 0 { // Synchronize decrements with Wait. race.ReleaseMerge(unsafe.Pointer(wg)) } race.Disable() defer race.Enable() } // counter加delta state := atomic.AddUint64(statep, uint64(delta)<<32) v := int32(state >> 32) // 獲取當前活躍的g的數量 w := uint32(state) // 獲取當前Wait()的次數 // 競爭檢測程式碼,不看 if race.Enabled && delta > 0 && v == int32(delta) { // The first increment must be synchronized with Wait. // Need to model this as a read, because there can be // several concurrent wg.counter transitions from 0. race.Read(unsafe.Pointer(semap)) } // 活躍的g個數不能是負數個,有可能delta傳的是負數 if v < 0 { panic("sync: negative WaitGroup counter") } // 說明先呼叫的Wait()再呼叫Add(),正常應該反過來 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // Add()執行成功返回 if v > 0 || w == 0 { return } // 到這一步說明 counter == 0 且 waiters > 0. // 如果*statep != state,意味著有以下兩種錯誤情況發生 // - Add()和Wait()併發(concurrently)呼叫 // - 當counter歸零時,waiters數量還在增加 // 發生這兩種情況之一就要panic,以保證WaitGroup不被濫用 if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 如果WaitGroup使用規範,到這一步counter為0說明已無活躍g了 // 將state1置0,同時釋放所有的waiters *statep = 0 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } }
state
// 返回存state1和state2,即state和sema, // 因為32位編譯器不能直接對齊64位資料,需要這個函式做對齊工作 func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { //如果state1是64-bit則直接原樣返回 return &wg.state1, &wg.state2 } else { // 如果state1是32-bit對齊而不是64-bit對齊, // 那麼(&state1)+4 就是 64-bit 對齊了 state := (*[3]uint32)(unsafe.Pointer(&wg.state1)) return (*uint64)(unsafe.Pointer(&state[1])), &state[0] } }
-
unsafe.Alignof(T Type) 返回對齊方式,下表是32位系統和64位系統各種size的資料的對齊方式值
32位系統:
Sizeof(x) = 16 Alignof(x) = 4 Sizeof(x.a) = 1 Alignof(x.a) = 1 Offsetof(x.a) = 0 Sizeof(x.b) = 2 Alignof(x.b) = 2 Offsetof(x.b) = 2 Sizeof(x.c) = 12 Alignof(x.c) = 4 Offsetof(x.c) = 4
64位系統:
Sizeof(x) = 32 Alignof(x) = 8 Sizeof(x.a) = 1 Alignof(x.a) = 1 Offsetof(x.a) = 0 Sizeof(x.b) = 2 Alignof(x.b) = 2 Offsetof(x.b) = 2 Sizeof(x.c) = 24 Alignof(x.c) = 8 Offsetof(x.c) = 8
-
unsafe.Pointer(&T) 是一個可以包含任意型別變數的地址的通用指標,所以unsafe.Pointer()常用於各種指標相互轉換的橋樑。有四個特有的操作:
- 任何型別的指標都可以被轉化為Pointer
- Pointer可以被轉化為任何型別的指標
- uintptr可以被轉化為Pointer
- Pointer可以被轉化為uintptr
不能直接通過 *p 方式來取得真實的變數值,因為不知道變數的具體型別。可以通過 uintptr(unsafe.Pointer(&T)) 來進行運算。
unsafe.Pointer是可以比較的,並且支援和nil常量比較判斷是否為空指標。
-
uintptr() 主要用來進行指標計算,本質是一個整型。一般用 uintptr(unsafe.Pointer(&T))來進行運算(T的型別未知)。
注意:首先GC不認為uintptr是一個活引用,因此uintptr指向的物件可能被gc回收。其次,如果uintptr關聯的物件移動,則其值也不會更新,即uintptr無法保持對變數的引用。
Done
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Wait
Wait
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
// 獲取state1和state2的地址
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
for {
// 原語:原子操作獲取state1的值
state := atomic.LoadUint64(statep)
v := int32(state >> 32) // counter,活躍的g的數量
w := uint32(state) // waiters
// 如果沒有活躍的g了,直接返回
if v == 0 {
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// CAS操作增加waiters的數量
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 競爭檢測程式碼,不看
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
// 到這一步,說明當前g要阻塞等待了
// 原語:根據semap的值也就是state2的地址找到相應的阻塞佇列,把當前g放進去,並掛起
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}