1. 程式人生 > >golang同步之sync包

golang同步之sync包

golang中實現併發非常簡單,只需在需要併發的函式前面新增關鍵字"go",但是如何處理go併發機制中不同goroutine之間的同步與通訊,golang 中提供了sync包來解決相關的問題,當然還有其他的方式比如channel,原子操作atomic等等,這裡先介紹sync包的用法。

sync 包提供了互斥鎖這類的基本的同步原語.除 Once 和 WaitGroup 之外的型別大多用於底層庫的例程。更高階的同步操作通過通道與通訊進行。

type Cond

    func NewCond(l Locker) *Cond

    func (c *Cond) Broadcast()
    func (c *Cond) Signal()
    func (c *Cond) Wait()
type Locker
type Mutex
    func (m *Mutex) Lock()
    func (m *Mutex) Unlock()
type Once
    func (o *Once) Do(f func())
type Pool
    func (p *Pool) Get() interface{}
    func (p *Pool) Put(x interface{})
type RWMutex
    func (rw *RWMutex) Lock()寫鎖
    func (rw *RWMutex) RLock()                       讀鎖
    func (rw *RWMutex) RLocker() Locker
    func (rw *RWMutex) RUnlock()
    func (rw *RWMutex) Unlock()
type WaitGroup
    func (wg *WaitGroup) Add(delta int)
    func (wg *WaitGroup) Done()
    func (wg *WaitGroup) Wait()
    golang中的同步是通過sync.WaitGroup來實現的.WaitGroup的功能:它實現了一個類似佇列的結構,可以一直向佇列中新增任務,當任務完成後便從佇列中刪除,如果佇列中的任務沒有完全完成,可以通過Wait()函式來出發阻塞,防止程式繼續進行,直到所有的佇列任務都完成為止.
WaitGroup總共有三個方法:
Add(delta int), Done(), Wait()。Add:新增或者減少等待goroutine的數量
Done:相當於Add(-1)
Wait:執行阻塞,直到所有的WaitGroup數量變成0
package main

import (
	"fmt"
	"sync"
)

var waitgroup sync.WaitGroup

func function(i int) {
	fmt.Println(i)
	waitgroup.Done() //任務完成,將任務佇列中的任務數量-1,其實.Done就是.Add(-1)
}

func main() {
	for i := 0; i < 10; i++ {
	    //每建立一個goroutine,就把任務佇列中任務的數量+1
		waitgroup.Add(1) 
		go function(i)
	}
	//這裡會發生阻塞,直到佇列中所有的任務結束就會解除阻塞
	waitgroup.Wait() 
}
     程式中需要併發,需要建立多個goroutine,並且一定要等這些併發全部完成後才繼續接下來的程式執行.WaitGroup的特點是Wait()可以用來阻塞直到佇列中的所有任務都完成時才解除阻塞,而不需要sleep一個固定的時間來等待
    接下來看cond用法,很簡單一個goroutine等待另外的goroutine傳送通知喚醒。
package main

import (
	"fmt"
	"sync"
	"time"
)
var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)
var waitgroup sync.WaitGroup

func test(x int) {
	cond.L.Lock()
	//等待通知,阻塞在此
	cond.Wait()
	fmt.Println(x)
	time.Sleep(time.Second * 1)
	defer func() {
		cond.L.Unlock()//釋放鎖
		waitgroup.Done()
	}()
}

func main() {
	for i := 0; i < 10; i++ {
		go test(i)
		waitgroup.Add(1);
	}
	fmt.Println("start all")
	time.Sleep(time.Second * 1)
	// 下發一個通知給已經獲取鎖的goroutine
	cond.Signal()
	time.Sleep(time.Second * 1)
	// 下發一個通知給已經獲取鎖的goroutine
	cond.Signal()
	time.Sleep(time.Second * 1)
	//下發廣播給所有等待的goroutine
	fmt.Println("start Broadcast")
	cond.Broadcast()
	waitgroup.Wait()
}
    然後看Once,它可以保證程式碼段植段只被執行一次,可以用來實現單例。
   
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var once sync.Once
	onceBody := func() {
		time.Sleep(3e9)
		fmt.Println("Only once")
	}
	done := make(chan bool)
	for i := 0; i < 10; i++ {
		j := i
		go func(int) {
			once.Do(onceBody)
			fmt.Println(j)
			done <- true
		}(j)
	}
	<-done
	time.Sleep(3e9)
}
    用once可以保證上面的oncebody被執行一次,即使被多次呼叫,內部用一個atmoic int欄位標示是否被執行過,和一個鎖來實現,具體的可以看go的原始碼,syc目錄下的once.go
    然後說道pool,說白了就是一個物件池,這個類設計的目的是用來儲存和複用臨時物件,以減少記憶體分配,降低CG壓力。
    Get返回Pool中的任意一個物件。如果Pool為空,則呼叫New返回一個新建立的物件。如果沒有設定New,則返回nil。還有一個重要的特性是,放進Pool中的物件,會在說不準什麼時候被回收掉。所以如果事先Put進去100個物件,下次Get的時候發現Pool是空也是有可能的。不過這個特性的一個好處就在於不用擔心Pool會一直增長,因為Go已經幫你在Pool中做了回收機制。這個清理過程是在每次垃圾回收之前做的。垃圾回收是固定兩分鐘觸發一次,而且每次清理會將Pool中的所有物件都清理掉!
   
func main(){
	// 建立物件
	var pipe = &sync.Pool{New:func()interface{}{return "Hello,BeiJing"}}
	// 準備放入的字串
	val := "Hello,World!"
	// 放入
	pipe.Put(val)
	// 取出
	log.Println(pipe.Get())
	// 再取就沒有了,會自動呼叫NEW
	log.Println(pipe.Get())
} 
   最後RWMutex讀寫鎖,RWMutex有兩種鎖寫鎖和讀鎖,用法也有不同,首先讀鎖可以同時加多個,但是寫鎖就不行當你試圖加第二個寫鎖時就回導致當前的goroutine或者執行緒阻塞,但是這裡的讀鎖就不會,那他有什麼作用呢。
   當有讀鎖,試圖加寫鎖會阻塞,當有寫鎖,試圖加讀鎖時會阻塞,當有讀鎖,試圖加讀鎖時不會阻塞,這樣有什麼好處呢,當我們有一種資料讀操作遠遠多於寫操作時,當我們讀時,如果加mutex或者寫鎖,會大大影響其他執行緒,因為我們大多數是讀操作,因此如果我們加讀鎖,就不會影響其他執行緒的讀操作,同時有執行緒寫時也能保證資料的同步。最後一點很重要,不論是讀鎖還是寫鎖lock和unlock時一一對應的,unlock前一 定要有lock,就像c++的new和delete,一定要注意。
   下 面看兩個例子:來源:點選開啟連結
    隨便讀:注意此時此時不能寫。
 
package main

import (
    "sync"
    "time"
)

var m *sync.RWMutex

func main() {
    m = new(sync.RWMutex)
    
    // 多個同時讀
    go read(1)
    go read(2)

    time.Sleep(2*time.Second)
}

func read(i int) {
    println(i,"read start")

    m.RLock()
    println(i,"reading")
    time.Sleep(1*time.Second)
    m.RUnlock()    

    println(i,"read over")
}
   寫的時候不可讀也不可寫    
package main

import (
    "sync"
    "time"
)

var m *sync.RWMutex

func main() {
    m = new(sync.RWMutex)
    
    // 寫的時候啥也不能幹
    go write(1)
    go read(2)
    go write(3)

    time.Sleep(2*time.Second)
}

func read(i int) {
    println(i,"read start")

    m.RLock()
    println(i,"reading")
    time.Sleep(1*time.Second)
    m.RUnlock()    

    println(i,"read over")
}

func write(i int) {
    println(i,"write start")

    m.Lock()
    println(i,"writing")
    time.Sleep(1*time.Second)
    m.Unlock()

    println(i,"write over")
}