golang同步之sync包
阿新 • • 發佈:2019-01-23
golang中實現併發非常簡單,只需在需要併發的函式前面新增關鍵字"go",但是如何處理go併發機制中不同goroutine之間的同步與通訊,golang 中提供了sync包來解決相關的問題,當然還有其他的方式比如channel,原子操作atomic等等,這裡先介紹sync包的用法。
sync 包提供了互斥鎖這類的基本的同步原語.除 Once 和 WaitGroup 之外的型別大多用於底層庫的例程。更高階的同步操作通過通道與通訊進行。
type Cond
func NewCond(l Locker) *Cond
- func (c *Cond) Broadcast()
- func (c *Cond) Signal()
- func (c *Cond) Wait()
- type Locker
- type Mutex
- func (m *Mutex) Lock()
- func (m *Mutex) Unlock()
- type Once
- func (o *Once) Do(f func())
- type Pool
- func (p *Pool) Get() interface{}
- func (p *Pool) Put(x interface{})
- type RWMutex
- func (rw *RWMutex) Lock()寫鎖
- func (rw *RWMutex) RLock() 讀鎖
- func (rw *RWMutex) RLocker() Locker
- func (rw *RWMutex) RUnlock()
- func (rw *RWMutex) Unlock()
- type WaitGroup
- func (wg *WaitGroup) Add(delta int)
- func (wg *WaitGroup) Done()
- func (wg *WaitGroup) Wait()
- golang中的同步是通過sync.WaitGroup來實現的.WaitGroup的功能:它實現了一個類似佇列的結構,可以一直向佇列中新增任務,當任務完成後便從佇列中刪除,如果佇列中的任務沒有完全完成,可以通過Wait()函式來出發阻塞,防止程式繼續進行,直到所有的佇列任務都完成為止.
- WaitGroup總共有三個方法:
Add(delta int), Done(), Wait()。Add:新增或者減少等待goroutine的數量
Done:相當於Add(-1)
Wait:執行阻塞,直到所有的WaitGroup數量變成0 package main import ( "fmt" "sync" ) var waitgroup sync.WaitGroup func function(i int) { fmt.Println(i) waitgroup.Done() //任務完成,將任務佇列中的任務數量-1,其實.Done就是.Add(-1) } func main() { for i := 0; i < 10; i++ { //每建立一個goroutine,就把任務佇列中任務的數量+1 waitgroup.Add(1) go function(i) } //這裡會發生阻塞,直到佇列中所有的任務結束就會解除阻塞 waitgroup.Wait() }
- 程式中需要併發,需要建立多個goroutine,並且一定要等這些併發全部完成後才繼續接下來的程式執行.WaitGroup的特點是Wait()可以用來阻塞直到佇列中的所有任務都完成時才解除阻塞,而不需要sleep一個固定的時間來等待
- 接下來看cond用法,很簡單一個goroutine等待另外的goroutine傳送通知喚醒。
-
package main import ( "fmt" "sync" "time" ) var locker = new(sync.Mutex) var cond = sync.NewCond(locker) var waitgroup sync.WaitGroup func test(x int) { cond.L.Lock() //等待通知,阻塞在此 cond.Wait() fmt.Println(x) time.Sleep(time.Second * 1) defer func() { cond.L.Unlock()//釋放鎖 waitgroup.Done() }() } func main() { for i := 0; i < 10; i++ { go test(i) waitgroup.Add(1); } fmt.Println("start all") time.Sleep(time.Second * 1) // 下發一個通知給已經獲取鎖的goroutine cond.Signal() time.Sleep(time.Second * 1) // 下發一個通知給已經獲取鎖的goroutine cond.Signal() time.Sleep(time.Second * 1) //下發廣播給所有等待的goroutine fmt.Println("start Broadcast") cond.Broadcast() waitgroup.Wait() }
- 然後看Once,它可以保證程式碼段植段只被執行一次,可以用來實現單例。
-
package main import ( "fmt" "sync" "time" ) func main() { var once sync.Once onceBody := func() { time.Sleep(3e9) fmt.Println("Only once") } done := make(chan bool) for i := 0; i < 10; i++ { j := i go func(int) { once.Do(onceBody) fmt.Println(j) done <- true }(j) } <-done time.Sleep(3e9) }
- 用once可以保證上面的oncebody被執行一次,即使被多次呼叫,內部用一個atmoic int欄位標示是否被執行過,和一個鎖來實現,具體的可以看go的原始碼,syc目錄下的once.go
- 然後說道pool,說白了就是一個物件池,這個類設計的目的是用來儲存和複用臨時物件,以減少記憶體分配,降低CG壓力。
- Get返回Pool中的任意一個物件。如果Pool為空,則呼叫New返回一個新建立的物件。如果沒有設定New,則返回nil。還有一個重要的特性是,放進Pool中的物件,會在說不準什麼時候被回收掉。所以如果事先Put進去100個物件,下次Get的時候發現Pool是空也是有可能的。不過這個特性的一個好處就在於不用擔心Pool會一直增長,因為Go已經幫你在Pool中做了回收機制。這個清理過程是在每次垃圾回收之前做的。垃圾回收是固定兩分鐘觸發一次,而且每次清理會將Pool中的所有物件都清理掉!
-
func main(){ // 建立物件 var pipe = &sync.Pool{New:func()interface{}{return "Hello,BeiJing"}} // 準備放入的字串 val := "Hello,World!" // 放入 pipe.Put(val) // 取出 log.Println(pipe.Get()) // 再取就沒有了,會自動呼叫NEW log.Println(pipe.Get()) }
- 最後RWMutex讀寫鎖,RWMutex有兩種鎖寫鎖和讀鎖,用法也有不同,首先讀鎖可以同時加多個,但是寫鎖就不行當你試圖加第二個寫鎖時就回導致當前的goroutine或者執行緒阻塞,但是這裡的讀鎖就不會,那他有什麼作用呢。
- 當有讀鎖,試圖加寫鎖會阻塞,當有寫鎖,試圖加讀鎖時會阻塞,當有讀鎖,試圖加讀鎖時不會阻塞,這樣有什麼好處呢,當我們有一種資料讀操作遠遠多於寫操作時,當我們讀時,如果加mutex或者寫鎖,會大大影響其他執行緒,因為我們大多數是讀操作,因此如果我們加讀鎖,就不會影響其他執行緒的讀操作,同時有執行緒寫時也能保證資料的同步。最後一點很重要,不論是讀鎖還是寫鎖lock和unlock時一一對應的,unlock前一 定要有lock,就像c++的new和delete,一定要注意。
- 下 面看兩個例子:來源:點選開啟連結
- 隨便讀:注意此時此時不能寫。
-
寫的時候不可讀也不可寫package main import ( "sync" "time" ) var m *sync.RWMutex func main() { m = new(sync.RWMutex) // 多個同時讀 go read(1) go read(2) time.Sleep(2*time.Second) } func read(i int) { println(i,"read start") m.RLock() println(i,"reading") time.Sleep(1*time.Second) m.RUnlock() println(i,"read over") }
package main import ( "sync" "time" ) var m *sync.RWMutex func main() { m = new(sync.RWMutex) // 寫的時候啥也不能幹 go write(1) go read(2) go write(3) time.Sleep(2*time.Second) } func read(i int) { println(i,"read start") m.RLock() println(i,"reading") time.Sleep(1*time.Second) m.RUnlock() println(i,"read over") } func write(i int) { println(i,"write start") m.Lock() println(i,"writing") time.Sleep(1*time.Second) m.Unlock() println(i,"write over") }