Go並發模式:管道與取消
關鍵字:Go語言,管道,取消機制,並發,sync.WaitGroup,包引用,通道,defer,select
GO並發模式:管道與取消
簡介
Go的並發能力可以使構建一個流數據管道變得非常容易,並且可以高校地使用機器I/O和多核處理器。這篇文章展示了一些例子,包括管道,對操作失敗的處理技術。
管道的概念
在Go裏,並沒有正式的管道的定義,它只是眾多並發程序其中的一個。通俗來講,一個管道是一系列由通道連接的階段,每個階段都是一組運行著同樣函數的goroutine。在每個階段裏,goroutine在幹著:
- 通過接入通道(inbound channels)接收上遊流下來的值
- 對這些數據執行某個函數,通常會產生新的值
- 通過導出通道(outbound channels)下遊發送值
- 第一個階段也叫source或者producer
- 最後一個階段也叫sink或者consumer
以上這兩個階段都只能有一個通道,或者是接入通道或者是導出通道,不能同時擁有這兩種。而其他每個階段都可以共同擁有任意數量的接入通道和導出通道。
一個用來學習的例子
下面我們將展開一個簡單的管道例子,來闡述其中的思想和技術,後面會有實際的例子。
平方函數
直接看代碼中註釋。
註意goroutine是函數體內並發,有一個殼sandbox扣著它。
// 要想run,必須package main,默認是文件夾目錄名,要更改一下 package main import "fmt" // 設想一個擁有三個階段的管道 /* * First Stage: gen * params: 一個以逗號分隔的整數列表,數量不限 * return: 一個通道,包含參數中整數列表的通道 */ func gen(nums ... int) <-chan int { out := make(chan int) // 通過一個goroutine來將參數中的每個整數發送到通道中去。 go func() { for _, n := range nums { out <- n } close(out) // close方法作為上面的for循環的終止條件,不能省略。 }() return out } /* * Second Stage: sq * params: 一個包含參數中整數列表的通道 * return: 一個包含將參數通道中每個整數平方後的列表的通道 * note: 因為參數和返回值的類型都是相同的整型通道,所以可以反復嵌套該方法。 */ func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n // 平方 } close(out) }() return out } /* * Final Stage: main * 是一個main函數,沒有參數也沒有返回值,它相當於客戶端調用 */ func main() { c := gen(2, 3) // 建立通道 out := sq(c) // 通道處理 // 上面傳入兩個值2和3,那麽這裏就要對應的消費兩次輸出 fmt.Println(<-out) fmt.Println(<-out) // 嵌套sq for n := range sq(sq(gen(1, 2, 4, 5))) { fmt.Println(n) } } // output: // 4 // 9 // 1 // 16 // 256 // 625
Fan-out和Fan-in
- Fan-out,扇出。多個函數可以讀取同一個通道直到該通道關閉。可讓一群工人並用CPU和IO
- Fan-in,扇入。一個函數可以讀取多個輸入,每個輸入被多路復用到一個獨立的通道上,當所有輸入被關閉時,這個通道也會被關閉,同時它也會關掉這個函數的使用權。
下面我們將運行連個sq函數的實例,都會讀取同一個輸入通道,我們將使用一個新函數,叫做merge,來扇入多個結果。
向一個已關閉的通道發送值,會引起通道panic錯誤,所以引入了sync.WaitGroup功能來控制當所有發送行為結束以後關閉通道。
sync.WaitGroup
sync.WaitGroup像java的倒計時鎖,首先我們定義它的Wait方法設置一個鎖到某個並發程序中,然後通過Add方法定義計數器大小CounterSize,該大小為最多發送數據到通道的執行次數,每次執行結束要通過Done方法來使CounterSize減一,直到CounterSize為0,上面我們定義的Wait才會釋放鎖。
註意,WaitGroup的計數器大小CounterSize在初始化時默認為1,也就是說沒調用Add之前,需要一次Done方法執行以後,Wait鎖才會釋放。
merge函數
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
// 定義一個獨立通道out,將接收所有值
out := make(chan int)
// 將通道中所有值轉到out
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
// 將merge參數中所有通道的值都合到唯一通道out上去
for _, c := range cs {
go output(c)
}
// 啟動一個額外的goroutine(不會按照代碼順序執行,而是一進到merge就會啟動)來等待直到所有通道Done以後關閉那個唯一通道out。
go func() {
wg.Wait()// 直到wg全都Done了才會繼續執行。
close(out)
}()
return out
}
Go的包引用問題
當我們要使用其他Go文件內部的函數時,會有兩種處理方法:
- 將函數綁定到某個type下,然後在調用時創建那個type的實例即可調用。
- 將函數名首字母大寫,我們就可以通過包名調用了。
以上兩種方法都會存在一個問題,就是包引用問題,如果你找不到源碼位置,調用其函數就無從談起,那麽如何正確的引用包呢?
註意,最容易引發混亂的是main函數,因為main函數是可執行Go文件的必須元素,同時必須是指定package也為main,因此我們盡量不要在main函數所在的Go文件中添加與main無關的內容,否則我們很難通過包名或者文件名定位函數的意思。
註意,Go中最沒用的就是Go文件名了,包引用都是通過package。
正確的引用包是:將被調用函數所在文件,聲明package為其所在文件夾名字,
註意,所有的該文件夾下的Go文件的package聲明必須為同一個,不能出現第二個值,對外部調用者來講,這些文件好似在一起,都是從一個package讀取,並無區分。
然後在調用函數的地方import進來被調用函數聲明的package即可。
所以總結一下,文件夾名即包名,文件夾內給Go文件起名要能夠解釋清楚文件內容,main函數文件指定到有意義的文件夾下,導入所需函數包。
main函數
func main() {
in := pipeline.Gen(2, 3)
c1 := pipeline.Sq(in)
c2 := pipeline.Sq(in)
// 將c1和c2通道內的值合並到一起
for n := range merge(c1, c2) {
fmt.Println(n)
}
}
// Output:
// 4
// 9
等價於
out:= merge(c1,c2)
fmt.Println(<-out)
fmt.Println(<-out)
fmt.Println(<-out)// 第三次輸出,通道已無值,輸出零值,如果通道輸出完畢時即關閉的話,這一行會報錯
// Output:
// 4
// 9
// 0
發現問題?發送次數少於接收次數
上面的管道函數有一個模式:
- 所有的發送操作完成時,階段會關閉他們的導出通道。
- 階段會一直從導入通道中接收值,直到那些通道被關閉。
這個模式允許每個接收的階段可以被作為一個range循環寫入,並且保證一旦所有的值都已經成功發送下遊,所有的goroutine退出。
但是在真實的管道裏,階段不會總是能接收到所有的導入值。有時候這是由於一個設計:
接收者可能只需要一個值的子集來取得進展。
更常見的是,一個階段早早退出是因為一個導入值代表了一個更早階段的error。在這兩種情況下,接收方不應該等待其余值的到達,並且我們想要更早的階段來停止那些後期階段不需要的生產時期的值。
在我們的例子中,
out:= merge(c1,c2)
fmt.Println(<-out)
// Output:
// 4
實際上out通道中還有一個9沒有被輸出,通道的值此時沒有被完全消費,這時goroutine就會不斷嘗試發送該值,並且會被無限期阻塞。
這是一個資源泄露,goroutine消耗內存和運行時資源,並且在goroutine棧中的堆引用會一直防止數據被垃圾收集器回收。goroutine不可被垃圾收集,只能必須靠自己exit。
所以,我們需要找到方式,能夠在下遊階段接收所有導入值失敗的時候,上遊階段的管道仍舊能夠退出:
- 一種方式是改變導出通道讓它又有一個buffer緩沖區,一個緩沖區能夠持有一個固定數量的值,如果緩沖區內仍有空間,發送操作就立即完成。
緩沖區的內容我們在前面的文章中有仔細介紹。
總之就是可以釋放goroutine的發送操作到緩沖區,不會被無限期阻塞。
在我們的管道中,返回到被阻塞的goroutine,我們可能考慮到添加一個緩沖區到merge函數返回的導出通道:
out := make(chan int, 1)// 增加一個緩沖區,可以存放通道中未發送的值
但是問題仍在發生,我們這裏是因為知道我們上面只寫了一遍發送,而通道已知有兩次接收值,所以我們可以這麽幹,但是這個代碼是不好的,易碎的,一旦條件發生改變,就要對代碼進行調整。
因此,我們需要為下遊階段提供一種方式來象征發送者,來停止接收輸入。
明確的取消機制
當main函數決定退出,而不再接收任何out通道的值的時候,它必須告訴上遊的goroutine,放棄他們試圖發送的值。
在一個通道中如此操作發送值,被稱作done。
它發送兩個值因為有兩個潛在的阻塞發送者。
我們修改merge,給它加入一個參數是struct{}結構體通道。然後修改merge中的output函數,將原來的
out <- n:
替換為:
select {
case out <- n:
case <-done:
}
意思是:如果n還有未發送的值,就正常發送,如果done有未發送的值就發送done。然後我們再修改一下main函數:
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out)
done <- struct{}{}
done <- struct{}{}
當out只被輸出一次的時候,此時循環還剩兩次(總共三次,因為merge函數的參數有三個通道,會循環三次),為了避免循環阻塞在out輸出的位置,我們給done通道傳入了結構體零值,merge函數中那個循環就會放棄發送out值,而去執行done的發送。
但是問題仍在繼續,這裏仍舊是因為我們預知通道接收次數,以及發送放空次數,所以可以寫出這個順序和次數,這仍舊是易碎的,本質上除了讓我們學習了一下這種寫法,與上面發生的無異。
我們需要一種方式,可以在未知goroutine數量,未知通道大小的情況下,隨時按需阻止下遊階段發送未發送完畢的通道。
因為接收操作在一個封閉的通道可以總是立即執行,產生類元素的零值。
這就意味著main函數能夠對所有被done通道關閉的發送者解除阻塞。這實際上是一個廣播信號發送者。我們擴展管道功能的each來接收done作為一個參數來安排通過defer來延遲關閉,以便所有的main函數的返回路徑能夠發送信號到管道階段去退出。
先看merge函數:
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
我們在for循環前面加入了一行sync.WaitGroup的Done的延遲方法,然後修改了select內部當done可被輸出時,直接結束merge函數(更別提跳出循環了),直接執行defer的wg.Done去掉一次計數器數值。然後看main函數:
func main() {
done := make(chan struct{})
defer close(done)
in := pipeline.Gen(2, 3)
c1 := pipeline.Sq(in)
c2 := pipeline.Sq(in)
out := merge(done, c1, c2)
fmt.Println(<-out)
}
首先我們去掉了done通道的緩沖區,加了一行關閉done通道的延遲操作。當代碼執行玩fmt的一次輸出以後,main函數執行完畢,會調用defer關閉done通道,回到merge函數中,done通道被關閉以後,case ->done被執行merge函數執行完畢,執行wg.Done()。
總結
本文詳細闡述了Go管道的概念,是有三組動作:生產通道,處理通道,使用通道,這三組動作實現了Go的管道。通過一個例子我們搞清楚了管道的含義,接著又介紹了Fan-out,是關於多個函數對同一個通道的操作,以及一個函數對多個通道的操作(例子中使用了merge,將多個通道合並為一個)。這期間,我們研究了sync.WaitGroup以及Go語言中的包引用特性。最後,我們在例子中發現了管道並發的問題,並循序漸進地找到了解決方法,在此期間,讓我們加深了對defer,管道,通道,select的理解。
參考資料
- Go官方文檔
源碼位置
更多請轉到醒者呆的博客園
Go並發模式:管道與取消