Golang併發模型:輕鬆入門流水線FAN模式
前一篇文章《Golang併發模型:輕鬆入門流水線模型》,介紹了流水線模型的概念,這篇文章是流水線模型進階,介紹FAN-IN和FAN-OUT,FAN模式可以讓我們的流水線模型更好的利用Golang併發,提高軟體效能。但FAN模式不一定是萬能,不見得能提高程式的效能,甚至還不如普通的流水線。我們先介紹下FAN模式,再看看它怎麼提升效能的,它是不是萬能的。
FAN-IN和FAN-OUT模式
Golang的併發模式靈感來自現實世界,這些模式是通用的,毫無例外,FAN模式也是對當前世界的模仿。以汽車組裝為例,汽車生產線上有個階段是給小汽車裝4個輪子,可以把這個階段任務交給4個人同時去做,這4個人把輪子都裝完後,再把汽車移動到生產線下一個階段。這個過程中,就有任務的分發,和任務結果的收集。其中任務分發是FAN-OUT,任務收集是FAN-IN。
- **FAN-OUT模式:多個goroutine從同一個通道讀取資料,直到該通道關閉。**OUT是一種張開的模式,所以又被稱為扇出,可以用來分發任務。
- **FAN-IN模式:1個goroutine從多個通道讀取資料,直到這些通道關閉。**IN是一種收斂的模式,所以又被稱為扇入,用來收集處理的結果。
FAN-IN和FAN-OUT實踐
我們這次試用FAN-OUT和FAN-IN,解決《Golang併發模型:輕鬆入門流水線模型》中提到的問題:計算一個整數切片中元素的平方值並把它打印出來。
producer()
保持不變,負責生產資料。squre()
- 修改
main()
,啟動3個square,這3個squre從producer生成的通道讀資料,這是FAN-OUT。 - 增加
merge()
,入參是3個square各自寫資料的通道,給這3個通道分別啟動1個協程,把資料寫入到自己建立的通道,並返回該通道,這是FAN-IN。
package main
import (
"fmt"
"sync"
)
func producer(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close (out)
for _, n := range nums {
out <- i
}
}()
return out
}
func square(inCh <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range inCh {
out <- n * n
}
}()
return out
}
func merge(cs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
collect := func(in <-chan int) {
defer wg.Done()
for n := range in {
out <- n
}
}
wg.Add(len(cs))
// FAN-IN
for _, c := range cs {
go collect(c)
}
// 錯誤方式:直接等待是bug,死鎖,因為merge寫了out,main卻沒有讀
// wg.Wait()
// close(out)
// 正確方式
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
in := producer(1, 2, 3, 4)
// FAN-OUT
c1 := square(in)
c2 := square(in)
c3 := square(in)
// consumer
for ret := range merge(c1, c2, c3) {
fmt.Printf("%3d ", ret)
}
fmt.Println()
}
3個squre協程併發執行,結果順序是無法確定的,所以你得到的結果,不一定與下面的相同。
➜ awesome git:(master) ✗ go run hi.go
1 4 16 9
FAN模式真能提升效能嗎?
相信你心裡已經有了答案,可以的。我們還是使用老問題,對比一下簡單的流水線和FAN模式的流水線,修改下程式碼,增加程式的執行時間:
produer()
使用引數生成指定數量的資料。square()
增加阻塞操作,睡眠1s,模擬階段的執行時間。main()
關閉對結果資料的列印,降低結果處理時的IO對FAN模式的對比。
// hi_simple.go
package main
import (
"fmt"
)
func producer(n int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; i < n; i++ {
out <- i
}
}()
return out
}
func square(inCh <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range inCh {
out <- n * n
// simulate
time.Sleep(time.Second)
}
}()
return out
}
func main() {
in := producer(10)
ch := square(in)
// consumer
for _ = range ch {
}
}
使用FAN模式的流水線:
// hi_fan.go
package main
import (
"sync"
"time"
)
func producer(n int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; i < n; i++ {
out <- i
}
}()
return out
}
func square(inCh <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range inCh {
out <- n * n
// simulate
time.Sleep(time.Second)
}
}()
return out
}
func merge(cs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
collect := func(in <-chan int) {
defer wg.Done()
for n := range in {
out <- n
}
}
wg.Add(len(cs))
// FAN-IN
for _, c := range cs {
go collect(c)
}
// 錯誤方式:直接等待是bug,死鎖,因為merge寫了out,main卻沒有讀
// wg.Wait()
// close(out)
// 正確方式
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
in := producer(10)
// FAN-OUT
c1 := square(in)
c2 := square(in)
c3 := square(in)
// consumer
for _ = range merge(c1, c2, c3) {
}
}
多次測試,每次結果近似,結果如下:
- FAN模式利用了7%的CPU,而普通流水線CPU只使用了3%,FAN模式能夠更好的利用CPU,提供更好的併發,提高Golang程式的併發效能。
- FAN模式耗時10s,普通流水線耗時4s。在協程比較費時時,FAN模式可以減少程式執行時間,同樣的時間,可以處理更多的資料。
➜ awesome git:(master) ✗ time go run hi_simple.go
go run hi_simple.go 0.17s user 0.18s system 3% cpu 10.389 total
➜ awesome git:(master) ✗
➜ awesome git:(master) ✗ time go run hi_fan.go
go run hi_fan.go 0.17s user 0.16s system 7% cpu 4.288 total
也可以使用Benchmark進行測試,看2個型別的執行時間,結論相同。為了節約篇幅,這裡不再介紹,方法和結果貼在Gist了,想看的朋友瞄一眼,或自己動手搞搞。
FAN模式一定能提升效能嗎?
FAN模式可以提高併發的效能,那我們是不是可以都使用FAN模式?
不行的,因為FAN模式不一定能提升效能。
依然使用之前的問題,再次修改下程式碼,其他不變:
squre()
去掉耗時。main()
增加producer()的入參,讓producer生產10,000,000個數據。
// hi_simple.go
func square(inCh <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range inCh {
out <- n * n
}
}()
return out
}
func main() {
in := producer(10000000)
ch := square(in)
// consumer
for _ = range ch {
}
}
// hi_fan.go
package main
import (
"sync"
)
func square(inCh <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range inCh {
out <- n * n
}
}()
return out
}
func main() {
in := producer(10000000)
// FAN-OUT
c1 := square(in)
c2 := square(in)
c3 := square(in)
// consumer
for _ = range merge(c1, c2, c3) {
}
}
結果,可以跑多次,結果近似:
➜ awesome git:(master) ✗ time go run hi_simple.go
go run hi_simple.go 9.96s user 5.93s system 168% cpu 9.424 total
➜ awesome git:(master) ✗ time go run hi_fan.go
go run hi_fan.go 23.35s user 11.51s system 297% cpu 11.737 total
從這個結果,我們能看到2點。
- FAN模式可以提高CPU利用率。
- FAN模式不一定能提升效率,降低程式執行時間。
優化FAN模式
既然FAN模式不一定能提高效能,如何優化?
不同的場景優化不同,要依具體的情況,解決程式的瓶頸。
我們當前程式的瓶頸在FAN-IN,squre函式很快就完成,merge函式它把3個數據寫入到1個通道的時候出現了瓶頸,適當使用帶緩衝通道可以提高程式效能,再修改下程式碼
-
merge()
中的out
修改為:out := make(chan int, 100)
結果:
➜ awesome git:(master) ✗ time go run hi_fan_buffered.go
go run hi_fan_buffered.go 19.85s user 8.19s system 323% cpu 8.658 total
使用帶快取通道後,程式的效能有了較大提升,CPU利用率提高到323%,提升了8%,執行時間從11.7降低到8.6,降低了26%。
FAN模式的特點很簡單,相信你已經掌握了,如果記不清了看這裡,本文所有程式碼在該Github倉庫。
FAN模式很有意思,並且能提高Golang併發的效能,如果想以後運用自如,用到自己的專案中去,還是要寫寫自己的Demo,快去實踐一把。
完整示例程式碼
本文所有程式碼都在倉庫,可檢視完整示例程式碼:https://github.com/Shitaibin/golang_pipeline_step_by_step
併發系列文章推薦
下一篇,寫流水線中協程的“優雅退出”,歡迎關注。
關注公眾號,獲取最新Golang文章。 一起學Golang-分享有料的Go語言技術
- 如果這篇文章對你有幫助,不妨關注下我的Github,有文章會收到通知。
- 本文作者:大彬
- 如果喜歡本文,隨意轉載,但請保留此原文連結:http://lessisbetter.site/2018/11/28/golang-pipeline-fan-model/