go獲取協程(goroutine)號的例項
我就廢話不多說了,大家還是直接看程式碼吧~
func GetGID() uint64 { b := make([]byte,64) b = b[:runtime.Stack(b,false)] b = bytes.TrimPrefix(b,[]byte("goroutine ")) b = b[:bytes.IndexByte(b,' ')] n,_ := strconv.ParseUint(string(b),10,64) return n }
補充:Go語言併發協程Goroutine和通道channel
Go語言併發協程Goroutine
1.1 Go語言競爭狀態
有併發,就有資源競爭,如果兩個或者多個 goroutine 在沒有相互同步的情況下,訪問某個共享的資源,比如同時對該資源進行讀寫時,就會處於相互競爭的狀態,這就是併發中的資源競爭。
併發本身並不複雜,但是因為有了資源競爭的問題,就使得我們開發出好的併發程式變得複雜起來,因為會引起很多莫名其妙的問題。
以下程式碼就會出現競爭狀態:
import ( "fmt" "runtime" "sync" ) var ( count int32 wg sync.WaitGroup ) func main() { wg.Add(2) go incCount() go incCount() wg.Wait() fmt.Println(count) } func incCount() { defer wg.Done() for i := 0; i < 2; i++ { value := count runtime.Gosched() value++ count = value } }
count 變數沒有任何同步保護,所以兩個 goroutine 都會對其進行讀寫,會導致對已經計算好的結果被覆蓋,以至於產生錯誤結果。
程式碼中的 runtime.Gosched() 是讓當前 goroutine 暫停的意思,退回執行佇列runq,讓其他等待的 goroutine 執行,目的是為了使資源競爭的結果更明顯,下次執行暫停的goroutine時從斷點處開始。
分析程式執行過程:
g1 讀取到 count 的值為 0;
然後 g1 暫停了,切換到 g2 執行,g2 讀取到 count 的值也為 0;
g2 暫停,切換到 g1暫停的位置繼續執行,g1 對 count+1,count 的值變為 1;
g1 暫停,切換到 g2,g2 剛剛已經獲取到值 0,對其 +1,最後賦值給 count,其結果還是 1;
可以看出 g1 對 count+1 的結果被 g2 給覆蓋了,兩個 goroutine 都 +1 而結果還是 1。
通過上面的分析可以看出,之所以出現上面的問題,是因為兩個 goroutine 相互覆蓋結果。
所以我們對於同一個資源的讀寫必須是原子化的,也就是說,同一時間只能允許有一個 goroutine 對共享資源進行讀寫操作。 此例子的共享資源就是count
通過go build -race生成一個可以執行檔案,然後再執行這個可執行檔案,就可以檢測資源競爭資訊,看到打印出的檢測資訊。如下
================== WARNING: DATA RACE Read at 0x000000619cbc by goroutine 8: main.incCount() D:/code/src/main.go:25 +0x80// goroutine 8 在程式碼 25 行讀取共享資源value := count Previous write at 0x000000619cbc by goroutine 7: main.incCount() D:/code/src/main.go:28 +0x9f// goroutine 7 在程式碼 28行修改共享資源count=value Goroutine 8 (running) created at: main.main() D:/code/src/main.go:17 +0x7e Goroutine 7 (finished) created at: main.main() D:/code/src/main.go:16 +0x66//兩個 goroutine 都是從 main 函式的 16、17 行通過 go 關鍵字啟動的。 ================== 4 Found 1 data race(s)
1.2 鎖住共享資源
Go語言提供了傳統的同步 goroutine 的機制,就是對共享資源加鎖。atomic 和 sync 包裡的一些函式就可以對共享的資源進行加鎖操作。
1.2.1 原子函式
原子函式能夠以很底層的加鎖機制來同步訪問整型變數和指標
import ( "fmt" "runtime" "sync" "sync/atomic" ) var ( counter int64 wg sync.WaitGroup ) func main() { wg.Add(2) go incCounter(1) go incCounter(2) wg.Wait() //等待goroutine結束 fmt.Println(counter) } func incCounter(id int) { defer wg.Done() for count := 0; count < 2; count++ { atomic.AddInt64(&counter,1) //安全的對counter加1 runtime.Gosched() } }
上述程式碼中使用了 atmoic 包的 AddInt64 函式,這個函式會同步整型值的加法,方法是強制同一時刻只能有一個 gorountie 執行並完成這個加法操作。
另外兩個有用的原子函式是 LoadInt64 和 StoreInt64。這兩個函式提供了一種安全地讀和寫一個整型值的方式。下面的程式碼就使用了 LoadInt64 和 StoreInt64 函式來建立一個同步標誌,這個標誌可以向程式裡多個 goroutine 通知某個特殊狀態。
import ( "fmt" "sync" "sync/atomic" "time" ) var ( shutdown int64 wg sync.WaitGroup ) func main() { wg.Add(2) go doWork("A") go doWork("B") time.Sleep(1 * time.Second) fmt.Println("Shutdown Now") atomic.StoreInt64(&shutdown,1) wg.Wait() } func doWork(name string) { defer wg.Done() for { fmt.Printf("Doing %s Work\n",name) time.Sleep(250 * time.Millisecond) if atomic.LoadInt64(&shutdown) == 1 { fmt.Printf("Shutting %s Down\n",name) break } } } --output-- Doing A Work Doing B Work Doing B Work Doing A Work Doing A Work Doing B Work Doing B Work Doing A Work//前8行順序每次執行時都不一樣 Shutdown Now Shutting A Down Shutting B Down//A和B都shut down後,由wg.Done()把計數器置0
上面程式碼中 main 函式使用 StoreInt64 函式來安全地修改 shutdown 變數的值。如果哪個 doWork goroutine 試圖在 main 函式呼叫 StoreInt64 的同時呼叫 LoadInt64 函式,那麼原子函式會將這些呼叫互相同步,保證這些操作都是安全的,不會進入競爭狀態。
1.2.2 鎖
見上篇文章,上面的例子為保持同步,取消競爭,可照以下操作:
func incCounter(id int) { defer wg.Done() for count := 0; count < 2; count++ { //同一時刻只允許一個goroutine進入這個臨界區 mutex.Lock() { value := counter runtime.Gosched()//退出當前goroutine,排程器會再次分配這個 goroutine 繼續執行。 value++ counter = value } mutex.Unlock() //釋放鎖,允許其他正在等待的goroutine進入臨界區 } }
1.3 通道chan
統統將通道兩端的goroutine理解為生產者-消費者模式。
通道的資料接收一共有以下 4 種寫法。
阻塞接收資料
阻塞模式接收資料時,將接收變數作為<-操作符的左值,格式如下:
data := <-ch
執行該語句時將會阻塞,直到接收到資料並賦值給 data 變數。
2) 非阻塞接收資料
使用非阻塞方式從通道接收資料時,語句不會發生阻塞,格式如下:
data,ok := <-ch
data:表示接收到的資料。未接收到資料時,data 為通道型別的零值。
ok:表示是否接收到資料。
非阻塞的通道接收方法可能造成高的 CPU 佔用,因此使用非常少。如果需要實現接收超時檢測,可以配合 select 和計時器 channel 進行
3) 迴圈接收資料
import ( "fmt" "time" ) func main() { // 構建一個通道,這裡有沒有緩衝都可,因為是收了就發,無需阻塞等待 ch := make(chan int) // 開啟一個併發匿名函式 go func() { // 從3迴圈到0 for i := 3; i >= 0; i-- { // 傳送3到0之間的數值 ch <- i // 每次傳送完時等待 time.Sleep(time.Second) } }() // 遍歷接收通道資料 for data := range ch { // 列印通道資料 fmt.Println(data) // 當遇到資料0時,退出接收迴圈 if data == 0 { break } } } --output--
1.3.1 單向通道
ch := make(chan int) // 宣告一個只能寫入資料的通道型別,並賦值為ch var chSendOnly chan<- int = ch 或 ch := make(chan<- int) //宣告一個只能讀取資料的通道型別,並賦值為ch var chRecvOnly <-chan int = ch 或 ch := make(<-chan int)
1.3.2 優雅的關閉通道
1.3.3 無緩衝的通道
如果兩個 goroutine 沒有同時準備好,通道會導致先執行傳送或接收操作的 goroutine 阻塞等待。(阻塞指的是由於某種原因資料沒有到達,當前協程(執行緒)持續處於等待狀態,直到條件滿足才解除阻塞)這種對通道進行傳送和接收的互動行為本身就是同步的。其中任意一個操作都無法離開另一個操作單獨存在。
在網球比賽中,兩位選手會把球在兩個人之間來回傳遞。選手總是處在以下兩種狀態之一,要麼在等待接球,要麼將球打向對方。可以使用兩個 goroutine 來模擬網球比賽,並使用無緩衝的通道來模擬球的來回
// 這個示例程式展示如何用無緩衝的通道來模擬 // 2 個goroutine 間的網球比賽 package main import ( "fmt" "math/rand" "sync" "time" ) // wg 用來等待程式結束 var wg sync.WaitGroup func init() { rand.Seed(time.Now().UnixNano()) } // main 是所有Go 程式的入口 func main() { // 建立一個無緩衝的通道 court := make(chan int) // 計數加 2,表示要等待兩個goroutine wg.Add(2) // 啟動兩個選手 go player("Nadal",court) go player("Djokovic",court) // 發球 court <- 1 // 等待遊戲結束 wg.Wait() } // player 模擬一個選手在打網球 func player(name string,court chan int) { // 在函式退出時呼叫Done 來通知main 函式工作已經完成 defer wg.Done() for { // 等待球被擊打過來 ball,ok := <-court if !ok { // 如果通道被關閉,我們就贏了 fmt.Printf("Player %s Won\n",name) return } // 選隨機數,然後用這個數來判斷我們是否丟球 n := rand.Intn(100) if n%13 == 0 { fmt.Printf("Player %s Missed\n",name) // 關閉通道,表示我們輸了 close(court) return } // 顯示擊球數,並將擊球數加1 fmt.Printf("Player %s Hit %d\n",name,ball) ball++ // 將球打向對手,為啥這裡是把ball傳送到另一個go協程? //因為court無緩衝,此時另一個go協程正好在等待接收court內的值,所以此時轉向另一個go協程程式碼 court <- ball } }
1.3.4 有緩衝的通道
有緩衝的通道是一種在被接收前能儲存一個或者多個值的通道。這種型別的通道並不強制要求 goroutine 之間必須同時完成傳送和接收,傳送和接受的阻塞條件為只有在通道中沒有要接收的值時,接收動作才會阻塞。只有在通道沒有可用緩衝區容納被髮送的值時,傳送動作才會阻塞。
有緩衝的通道和無緩衝的通道之間的一個很大的不同:無緩衝的通道保證進行傳送和接收的 goroutine 會在同一時間進行資料交換;有緩衝的通道沒有這種保證。
為什麼要給通道限制緩衝區大小?
通道(channel)是在兩個 goroutine 間通訊的橋樑。使用 goroutine 的程式碼必然有一方提供資料,一方消費資料。當提供資料一方的資料供給速度大於消費方的資料處理速度時,如果通道不限制長度,那麼記憶體將不斷膨脹直到應用崩潰。因此,限制通道的長度有利於約束資料提供方的供給速度,供給資料量必須在消費方處理量+通道長度的範圍內,才能正常地處理資料。
1.3.5 channel超時機制
select 機制不是專門為超時而設計的,卻能很方便的解決超時問題,因為 select 的特點是隻要其中有一個 case 已經完成,程式就會繼續往下執行,而不會考慮其他 case 的情況。
基本語句為:
每個 case 語句裡必須是一個 IO 操作,
select { case <-chan1: // 如果chan1成功讀到資料,則進行該case處理語句 case chan2 <- 1: // 如果成功向chan2寫入資料,則進行該case處理語句 default: // 如果上面都沒有成功,則進入default處理流程 }
例子,注意之所以輸出5個num,是因為select裡的time.After在這裡的意思是ch通道無值可以接收的時候的3s後才print超時,即最多ch通道最多阻塞等待3s
func main() { ch := make(chan int) quit := make(chan bool) //新開一個協程 go func() { for { select { case num := <-ch: fmt.Println("num = ",num) case <-time.After(3 * time.Second): fmt.Println("超時") quit <- true } } }() //別忘了() for i := 0; i < 5; i++ { ch <- i time.Sleep(time.Second)//主協程進入休眠狀態,等待上面的go協程執行並進入阻塞等待狀態,就這樣來回執行,並通過chan通訊 } <-quit fmt.Println("程式結束") } --output-- num = 0 num = 1 num = 2 num = 3 num = 4 超時 程式結束
以上為個人經驗,希望能給大家一個參考,也希望大家多多支援我們。如有錯誤或未考慮完全的地方,望不吝賜教。