1. 程式人生 > 其它 >Golang sync.WaitGroup 原始碼分析

Golang sync.WaitGroup 原始碼分析

結構

// WaitGroup型別的資料不可以被複制
type WaitGroup struct {
	noCopy noCopy	// 用來禁止當前結構的型別複製

    // state1 是 64-bit變數:
    //   高32位是計數器counter,也就是活躍的g的個數
    //   低32位表示因執行Wait()而阻塞的g的數量,即waiters
    // state2 表示sema訊號量,說明本章程式碼用到了原語
    // 64-bit的原子操作需要64-bit的對齊,但是32位的編譯器只能保證64-bit欄位是32位對齊
    // 因此,在32位架構上,我們需要在state()中檢查state1是否對齊,
    // 並在需要時動態地 "交換 "欄位順序。
	state1 uint64
	state2 uint32
}

Add

Add

func (wg *WaitGroup) Add(delta int) {
    // 獲取state1 和 state2
	statep, semap := wg.state()
    // 競爭檢測程式碼不看
	if race.Enabled {
		_ = *statep // trigger nil deref early
		if delta < 0 {
			// Synchronize decrements with Wait.
			race.ReleaseMerge(unsafe.Pointer(wg))
		}
		race.Disable()
		defer race.Enable()
	}
    // counter加delta
	state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)	// 獲取當前活躍的g的數量
    w := uint32(state)		// 獲取當前Wait()的次數
    // 競爭檢測程式碼,不看
	if race.Enabled && delta > 0 && v == int32(delta) {
		// The first increment must be synchronized with Wait.
		// Need to model this as a read, because there can be
		// several concurrent wg.counter transitions from 0.
		race.Read(unsafe.Pointer(semap))
	}
    // 活躍的g個數不能是負數個,有可能delta傳的是負數
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
    // 說明先呼叫的Wait()再呼叫Add(),正常應該反過來
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
    // Add()執行成功返回
	if v > 0 || w == 0 {
		return
	}
	// 到這一步說明 counter == 0 且 waiters > 0.
	// 如果*statep != state,意味著有以下兩種錯誤情況發生
    // - Add()和Wait()併發(concurrently)呼叫
	// - 當counter歸零時,waiters數量還在增加
	// 發生這兩種情況之一就要panic,以保證WaitGroup不被濫用
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// 如果WaitGroup使用規範,到這一步counter為0說明已無活躍g了
    // 將state1置0,同時釋放所有的waiters
	*statep = 0
	for ; w != 0; w-- {
		runtime_Semrelease(semap, false, 0)
	}
}

state

// 返回存state1和state2,即state和sema,
// 因為32位編譯器不能直接對齊64位資料,需要這個函式做對齊工作
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
		//如果state1是64-bit則直接原樣返回
		return &wg.state1, &wg.state2
	} else {
		// 如果state1是32-bit對齊而不是64-bit對齊,
		// 那麼(&state1)+4 就是 64-bit 對齊了
		state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
		return (*uint64)(unsafe.Pointer(&state[1])), &state[0]
	}
}
  • unsafe.Alignof(T Type) 返回對齊方式,下表是32位系統和64位系統各種size的資料的對齊方式值

    32位系統:

    Sizeof(x)   = 16  Alignof(x)   = 4
    Sizeof(x.a) = 1   Alignof(x.a) = 1 Offsetof(x.a) = 0
    Sizeof(x.b) = 2   Alignof(x.b) = 2 Offsetof(x.b) = 2
    Sizeof(x.c) = 12  Alignof(x.c) = 4 Offsetof(x.c) = 4
    

    64位系統:

    Sizeof(x)   = 32  Alignof(x)   = 8
    Sizeof(x.a) = 1   Alignof(x.a) = 1 Offsetof(x.a) = 0
    Sizeof(x.b) = 2   Alignof(x.b) = 2 Offsetof(x.b) = 2
    Sizeof(x.c) = 24  Alignof(x.c) = 8 Offsetof(x.c) = 8
    
  • unsafe.Pointer(&T) 是一個可以包含任意型別變數的地址的通用指標,所以unsafe.Pointer()常用於各種指標相互轉換的橋樑。有四個特有的操作:

    1. 任何型別的指標都可以被轉化為Pointer
    2. Pointer可以被轉化為任何型別的指標
    3. uintptr可以被轉化為Pointer
    4. Pointer可以被轉化為uintptr

    不能直接通過 *p 方式來取得真實的變數值,因為不知道變數的具體型別。可以通過 uintptr(unsafe.Pointer(&T)) 來進行運算。

    unsafe.Pointer是可以比較的,並且支援和nil常量比較判斷是否為空指標。

  • uintptr() 主要用來進行指標計算,本質是一個整型。一般用 uintptr(unsafe.Pointer(&T))來進行運算(T的型別未知)。

    注意:首先GC不認為uintptr是一個活引用,因此uintptr指向的物件可能被gc回收。其次,如果uintptr關聯的物件移動,則其值也不會更新,即uintptr無法保持對變數的引用。

Done

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

Wait

Wait

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
    // 獲取state1和state2的地址
	statep, semap := wg.state()
	if race.Enabled {
		_ = *statep // trigger nil deref early
		race.Disable()
	}
	for {
        // 原語:原子操作獲取state1的值
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32)	// counter,活躍的g的數量
		w := uint32(state)		// waiters
        // 如果沒有活躍的g了,直接返回
		if v == 0 {
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
		// CAS操作增加waiters的數量
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
            // 競爭檢測程式碼,不看
			if race.Enabled && w == 0 {
				// Wait must be synchronized with the first Add.
				// Need to model this is as a write to race with the read in Add.
				// As a consequence, can do the write only for the first waiter,
				// otherwise concurrent Waits will race with each other.
				race.Write(unsafe.Pointer(semap))
			}
            // 到這一步,說明當前g要阻塞等待了
            // 原語:根據semap的值也就是state2的地址找到相應的阻塞佇列,把當前g放進去,並掛起
			runtime_Semacquire(semap)
            
			if *statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
	}
}