1. 程式人生 > >golang 併發設計模式(二)--管道模式管道和顯式取消

golang 併發設計模式(二)--管道模式管道和顯式取消

一、 引言

Go併發原語使得構建流式資料管道,高效利用I/O和多核變得簡單。這篇文章介紹了幾個管道例子,重點指出在操作失敗時的細微差別,並介紹了優雅處理失敗的技術。

二、 什麼是管道?

Go沒有正式的管道定義。管道只是眾多併發程式的一類。一般的,一個管道就是一些列的由channel連線起來的階段。每個階段都有執行相同邏輯的goroutine。在每個階段中,goroutine

· 從channel讀取上游資料

· 在資料上執行一些操作,通常會產生新的資料

· 通過channel將資料發往下游

每個階段都可以有任意個輸入channel和輸出channel,除了第一個和最有一個channel(只有輸入channel或只有輸出channel)。第一個步驟通常叫資料來源或者生產者,最後一個叫做儲存池或者消費者。

我們先從一個簡單的管道例子來解釋這些概念和技術,稍後我們會介紹一個更為複雜的例子。

數字的平方

假設管道有三個階段。

第一步,gen函式,是一個將數字列表轉換到一個channel中的函式。Gen函式啟動了一個goroutine,將數字傳送到channel,並在所有數字都發送完後關閉channel。

1

2

3

4

5

6

7

8

9

10

func gen(nums ...int) <-chan int {

    out := make(chan int)

    go func() {

        for _, n := range nums {

            out <- n

        }

        close(out)

    }()

    return out

}

第二個階段,sq,從上面的channel接收數字,並返回一個包含所有收到數字的平方的channel。在上游channel關閉後,這個階段已經往下游傳送完所有的結果,然後關閉輸出channel:

1

2

3

4

5

6

7

8

9

10

func sq(in <-chan int) <-chan int {

    out := make(chan int)

    go func() {

        for n := range in {

            out <- n * n

        }

        close(out)

    }()

    return out

}

main函式建立這個管道,並執行第一個階段,從第二個階段接收結果並逐個列印,直到channel被關閉。

1

2

3

4

5

6

7

8

9

func main() {

    // Set up the pipeline.

    c := gen(2, 3)

    out := sq(c)

    // Consume the output.

    fmt.Println(<-out) // 4

    fmt.Println(<-out) // 9

}

因為sq對輸入channel和輸出channel擁有相同的型別,我們可以任意次的組合他們。我們也可以像其他階段一樣,將main函式重寫成一個迴圈遍歷。

1

2

3

4

5

6

func main() {

    // Set up the pipeline and consume the output.

    for n := range sq(sq(gen(2, 3))) {

        fmt.Println(n) // 16 then 81

    }

}

扇出扇入(Fan-out, fan-in)

多個函式可以從同一個channel讀取資料,直到這個channel關閉,這叫扇出。這是一種多個工作例項分散式地協作以並行利用CPU和I/O的方式。

一個函式可以從多個輸入讀取並處理資料,直到所有的輸入channel都被關閉。這個函式會將所有輸入channel匯入一個單一的channel。這個單一的channel在所有輸入channel都關閉後才會關閉。這叫做扇入。

我們可以設定我們的管道執行兩個sq例項,每一個例項都從相同的輸入channel讀取資料。我們引入了一個新的函式,merge,來扇入結果:

1

2

3

4

5

6

7

8

9

10

11

12

func main() {

    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.

    c1 := sq(in)

    c2 := sq(in)

    // Consume the merged output from c1 and c2.

    for n := range merge(c1, c2) {

        fmt.Println(n) // 4 then 9, or 9 then 4

    }

}

merge函式為每一個輸入channel啟動一個goroutine,goroutine將資料拷貝到同一個輸出channel。這樣就將多個channel轉換成一個channel。一旦所有的output goroutine啟動起來,merge就啟動另一個goroutine,在所有輸入拷貝完畢後關閉輸出channel。
向一個關閉了的channel傳送資料會觸發異常,所以在呼叫close之前確認所有的傳送動作都執行完畢很重要。sync.WaitGroup型別為這種同步提供了一種簡便的方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

func merge(cs ...<-chan int) <-chan int {

    var wg sync.WaitGroup

    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output

    // copies values from c to out until c is closed, then calls wg.Done.

    output := func(c <-chan int) {

        for n := range c {

            out <- n

        }

        wg.Done()

    }

    wg.Add(len(cs))

    for _, c := range cs {

        go output(c)

    }

    // Start a goroutine to close out once all the output goroutines are

    // done.  This must start after the wg.Add call.

    go func() {

        wg.Wait()

        close(out)

    }()

    return out

}

三、 停止的藝術

我們所有的管道函式都遵循一種模式:

· 傳送者在傳送完畢時關閉其輸出channel

· 接收者持續從輸入管道接收資料直到輸入管道關閉。

這種模式使得每一個接收函式都能寫成一個range迴圈,保證所有的goroutine在資料成功傳送到下游後就關閉。

但是在真實的案例中,並不是所有的輸入資料都需要被接收處理。有些時候是故意這麼設計的:接收者可能只需要資料的子集就夠了;或者更一般的,因為輸入資料有錯誤而導致接收函式提早退出。上面任何一種情況下,接收者都不應該繼續等待後續的資料到來,並且我們希望上游函式停止生成後續步驟已經不需要的資料。

在我們的管道例子中,如果一個階段無法消費所有的輸入資料,那些傳送這些資料的goroutine就會一直阻塞下去:

1

2

3

4

5

6

7

    // Consume the first value from output.

    out := merge(c1, c2)

    fmt.Println(<-out) // 4 or 9

    return

    // Since we didn't receive the second value from out,

    // one of the output goroutines is hung attempting to send it.

}

這是一種資源洩漏:goroutine會佔用記憶體和執行時資源。goroutine棧持有的堆引用會阻止GC回收資源。而且goroutine不能被垃圾回收,必須主動退出。

我們必須重新設計管道中的上游函式,在下游函式無法接收所有輸入資料時退出。一種方法就是讓輸出channel擁有一定的快取。快取可以儲存一定數量的資料。如果快取空間足夠,傳送操作就會馬上返回:

1

2

3

4

c := make(chan int, 2) // buffer size 2

c <- 1  // succeeds immediately

c <- 2  // succeeds immediately

c <- 3  // blocks until another goroutine does <-c and receives 1

如果在channel建立時就知道需要傳送資料的數量,帶快取的channel會簡化程式碼。例如,我們可以重寫gen函式,拷貝一系列的整數到一個帶快取的channel而不是建立一個新的goroutine:

1

2

3

4

5

6

7

8

func gen(nums ...int) <-chan int {

    out := make(chan int, len(nums))

    for _, n := range nums {

        out <- n

    }

    close(out)

    return out

}

反過來我們看管道中被阻塞的goroutine,我們可以考慮為merge函式返回的輸出channel增加一個快取:

1

2

3

4

func merge(cs ...<-chan int) <-chan int {

    var wg sync.WaitGroup

    out := make(chan int, 1) // enough space for the unread inputs

    // ... the rest is unchanged ...

雖然這樣可以避免了程式中goroutine的阻塞,但這是很爛的程式碼。選擇快取大小為1取決於知道merge函式接收數字的數量和下游函式消費數字的數量。這是很不穩定的:如果我們向gen多傳送了一個數據,或者下游函式少消費了資料,我們就又一次阻塞了goroutine。

然而,我們需要提供一種方式,下游函式可以通知上游傳送者下游要停止接收資料。

四、 顯式取消

當main函式決定在沒有從out接收所有的資料而要退出時,它需要通知上游的goroutine取消即將傳送的資料。可以通過向一個叫做done的channel傳送資料來實現。因為有兩個潛在阻塞的goroutine,main函式會發送兩個資料:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

func main() {

    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.

    c1 := sq(in)

    c2 := sq(in)

    // Consume the first value from output.

    done := make(chan struct{}, 2)

    out := merge(done, c1, c2)

    fmt.Println(<-out) // 4 or 9

    // Tell the remaining senders we're leaving.

    done <- struct{}{}

    done <- struct{}{}

}

對傳送goroutine而言,需要將傳送操作替換為一個select語句,要麼out發生傳送操作,要麼從done接收資料。done的資料型別是空的struct,因為其值無關緊要:僅僅表示out需要取消傳送操作。output 繼續在輸入channel迴圈執行,因此上游函式是不會阻塞的。(接下來我們會討論如何讓迴圈提早退出)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {

    var wg sync.WaitGroup

    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output

    // copies values from c to out until c is closed or it receives a value

    // from done, then output calls wg.Done.

    output := func(c <-chan int) {

        for n := range c {

            select {

            case out <- n:

            case <-done:

            }

        }

        wg.Done()

    }

    // ... the rest is unchanged ...

這種方法有一個問題:每一個下游函式需要知道潛在可能阻塞的上游傳送者傳送報文的數量,以傳送響應的訊號讓其提早退出。跟蹤這些數量是無趣的而且很容易出錯。

我們需要一種能夠讓未知或無界數量的goroutine都能夠停止向下遊傳送資料的方法。在Go中,我們可以通過關閉一個channel實現。因為從一個關閉了的channel執行接收操作總能馬上成功,並返回相應資料型別的零值。

這意味著main函式僅通過關閉done就能實現將所有的傳送者解除阻塞。關閉操作是一個高效的對傳送者的廣播訊號。我們擴充套件管道中所有的函式接受done作為一個引數,並通過defer來實現相應channel的關閉操作。因此,無論main函式在哪一行退出都會通知上游退出。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

func main() {

    // Set up a done channel that's shared by the whole pipeline,

    // and close that channel when this pipeline exits, as a signal

    // for all the goroutines we started to exit.

    done := make(chan struct{})

    defer close(done)

    in := gen(done, 2, 3)

    // Distribute the sq work across two goroutines that both read from in.

    c1 := sq(done, in)

    c2 := sq(done, in)

    // Consume the first value from output.

    out := merge(done, c1, c2)

    fmt.Println(<-out) // 4 or 9

    // done will be closed by the deferred call.

}

現在每一個管道函式在done被關閉後就可以馬上返回了。merge函式中的output可以在接收管道的資料消費完之前返回,因為output函式知道上游傳送者sq會在done關閉後停止產生資料。同時,output通過defer語句保證wq.Done會在所有退出路徑上呼叫。

這裡有個機制:

Ø 首先保證sq退出,sq退出,源頭的生產者就沒有了,merge裡面的 for range 就會退出;

Ø 如果恰巧阻塞到merge 的select裡面,也由於done已經關閉,對導致解除阻塞而退出;

(原因是:一般通知生產者停止生產時,消費者已經不去消費了case out <- n: 會阻塞,所以slect 中的

case<-done:

ruturn 

能夠執行 select 多個chan 都已經準備就緒,選擇是隨機的,這就說明sq肯定有機會退出)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {

    var wg sync.WaitGroup

    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output

    // copies values from c to out until c or done is closed, then calls

    // wg.Done.

    output := func(c <-chan int) {

        defer wg.Done()

        for  n := range c {

            select {

            case out <- n:

            case<-done:              //這裡使用的是閉包

                return

            }

        }

    }

    wg.Add(len(cs))

    for _, c := range cs {

        go output(c)

    }

    // Start a goroutine to close out once all the output goroutines are

    // done.  This must start after the wg.Add call.

    go func() {

        wg.Wait()

        close(out)

    }()

    return  out

}

類似的,sq也可以在done關閉後馬上返回。sq通過defer語句使得任何退出路徑都能關閉其輸出channel out。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

func sq(done <-chan struct{}, in <-chan int) <-chan int {

    out := make(chan int)

    go func() {

        defer close(out)

        for  n := range in {

            select {

            case out <- n * n:

            case<-done:

                return

            }

        }

    }()

    return out

}

· 管道構建的指導思想如下:

· 每一個階段在所有傳送操作完成後關閉輸出channel。

· 每一個階段持續從輸入channel接收資料直到輸入channel被關閉或者生產者被解除阻塞(譯者:生產者退出)

· 

· 管道解除生產者阻塞有兩種方法:

· 要麼保證有足夠的快取空間儲存將要被生產的資料;

· 要麼顯式的通知生產者消費者要取消接收資料;

經驗

一般情況下有傳送者主動關閉chan,特殊情況下才由消費者發訊號告知生產者關閉chan, 避免生產者的goroutine一直阻塞;

這裡使用了一個技巧就是:讀取被關閉的channel會立馬返回,是不會阻塞的,返回值是chan type的零值;select 與之結合天衣無縫的撲捉到關閉訊號然後,生產者停止生產退出關閉channel,釋放資源。

樹形摘要

讓我們來看一個更為實際的管道。

MD5是一個資訊摘要演算法,對於檔案校驗非常有用。命令列工具md5sum很有用,可以列印一系列檔案的摘要值。

12345%md5sum*.god47c2bbc28298ca9befdfbc5d3aa4e65  bounded.goee869afd31f83cbb2d10ee81b2b831dc  parallel.gob88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我們的例子程式和md5sum類似,但是接受一個單一的資料夾作為引數,列印該資料夾下每一個普通檔案的摘要值,並按路徑名稱排序。

1 2 3 4 5 %gorunserial.go. d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go ee869afd31f83cbb2d10ee81b2b831dc  parallel.go b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我們程式的main函式呼叫一個工具函式MD5ALL,該函式返回一個從路徑名稱到摘要值的雜湊表,然後排序並輸出結果:

Go
1234567891011121314151617funcmain(){// Calculate the MD5 sum of all files under the specified directory,// then print the results sorted by path name.m,err:=MD5All(os.Args[1

相關推薦

golang 併發設計模式()--管道模式管道取消

一、 引言 Go併發原語使得構建流式資料管道,高效利用I/O和多核變得簡單。這篇文章介紹了幾個管道例子,重點指出在操作失敗時的細微差別,並介紹了優雅處理失敗的技術。 二、 什麼是管道? Go沒有正式的管道定義。管道只是眾多併發程式的一類。一般

golang 併發設計模式()--管道模式1

本文摘錄了許式偉 《Go,基於連線與組合的語言》部分內容,為了便於理解,我在其後端寫了個完整的示例程式幫助理解,這篇文章 一是展示go在並行程式設計中的偉大,也是理解和學習閉包的活的教科書 ---------------------------------------

golang 併發設計模式(一)--資源生成器模式

在應用系統中,常見的的應用場景就是呼叫一個生成器:生成訂單號,序列號,隨機數等。golang  goroutine為這種需求提供了強大的武器。1.簡單的生成器package main  import (          "fmt"        "math/rand")  func GenerateIntA

java設計模式() 結構模式之代理模式

1.結構模式(主要記錄類和物件的靜態結構)2.行為模式(描述通過物件間的合作形成的一種關係)3.建立型模式(用於生成物件) 1.代理模式是把一個實際存在的物件隱藏在一個與它有相同介面的代理者身後。代理者把這個實際的物件與外界溝通的渠道封裝起來,代理者把對方法的呼叫委託給實際的物件,

java設計模式() 結構模式之介面卡模式

1.結構模式(主要記錄類和物件的靜態結構) 2.行為模式(描述通過物件間的合作形成的一種關係) 3.建立型模式(用於生成物件) **定義: 介面卡模式把一個”錯誤“ 的介面轉換為所希望的形式,基於類的介面卡繼承了需適配的類,以此得到這個類的介面,同時介面卡繼承了它不需要的“

PHP設計模式():抽象類介面

Introduction 對於程式設計來說,對事物的抽象是一個老生常談的話題,抽象問題更利於面向物件程式設計以及程式設計模式。 和C/C++,Java,Python等語言一樣,PHP也支援面向物件,但是又有略微區別,如PHP支援在介面中定義常量,但是不支

Java併發集合()-ConcurrentSkipListMap分析使用

一、ConcurrentSkipListMap介紹 ConcurrentSkipListMap是執行緒安全的有序的雜湊表,適用於高併發的場景。ConcurrentSkipListMap和TreeMap,它們雖然都是有序的雜湊表。但是,第一,它們的執行緒安全機制不同,TreeMap是非執行緒安全的,而Concu

內建鎖鎖的區別(java併發程式設計第13章)

任何java物件都可以用作同步的鎖, 為了便於區分, 將其稱為內建鎖. JDK5.0引入了顯式鎖: Lock及其子類(如ReentrantLock, ReadWriteLock等).  內建鎖和顯式鎖的區別有: 1. 可中斷申請 如果使用synchronized

使用Newlife網絡庫管道模式解決數據粘包()

sub throw 數組 服務端 cit 需要 存在 bubuko reverse 上一篇我們講了 如何創建一個基本的Newlife網絡服務端 這邊我們來講一下如何解決粘包的問題 在上一篇總我們註冊了Newlife的管道處理器 ,我們來看看他是如何實現粘包處理的 svr

設計模式:MVC

模式 color div oid char sge 顯示 mod main 先附上部分代碼: /* *MVC 模式代表 Model-View-Controller(模型-視圖-控制器) 模式。這種模式用於應用程序的分層開發。 *Model(模型) - 模型代表一個存取數據

C++設計模式之狀態模式()

virtual alt 虛構函數 需求 rate names clas term delete 2、智能空調的設計與實現 某軟件公司將開發一套智能空調系統: 系統檢測到溫度處於20---30度之間,則切換到常溫狀態;溫度處於30---45度,則切換到制冷狀態;

淺析JAVA設計模式之工廠模式()

1.2 源碼 pub color post del tar pop south 1 工廠方法模式簡單介紹 工廠方法 (Factroy Method)模式:又稱多態性工廠模式(Polymorphic Factory),在這樣的模式中,核心工廠不再是一個詳

設計模式之工廠模式 ()

height align sta sys 12px pack arrow nbsp 靈活 工廠模式分為三大類 簡單工廠(SimpleFactory)工廠方法模式(Factory Method) 抽象工廠模式(Abstract Factory)動態工廠(Dynamic

ASP.NET 獲取IIS應用程序池的托管管道模式

其他 網站 托管 cto 應用程序池 要求 沒有 程序 利用 asp.net 中怎樣較為簡單的獲取網站程序池的托管管道模式 目前已知的方式是根據這個帖子https://github.com/kakalotte/... ,利用DirectoryEntry,但是程序權限要求太高

介紹MFC框架中涉及到的設計模式()

rac 並且 pos ack 有一個 聲明 blank blog fcm 接著上一篇《介紹MFC框架中涉及到的設計模式(一)》介紹 單例模式(Singleton Pattern) 單例模式是一種經常使用的軟件設計模式。在它的核心結構中僅僅包括一個

Java設計模式之工廠模式):抽象工廠模式

mar 模式 blank http left taxi ref www. app 2碳依5FVL冒傲3http://t.docin.com/etw488 am懦7鈉N山段9慌Q闌http://shequ.docin.com/ipu5657 iK1諾5N鍛認EUK剖嘲肆h

設計模式(簡單工廠模式)

rri err sum throw 數據 main div () highlight 學習筆記之設計模式二,簡單工廠; 假設要取得數據庫連接字符串, 1.創建抽象類: public abstract class ConnectString { //輸出字符

C#設計模式之十代理模式(Proxy Pattern)【結構型】

ride col 安全 .html 使用權 防火墻 一對多 tro 橋接 原文:C#設計模式之十二代理模式(Proxy Pattern)【結構型】一、引言 今天我們要講【結構型】設計模式的第七個模式,也是“結構型”設計模式中的最後一個模式,該模式是【代理模式】,英文名稱

C#設計模式備忘錄模式(Memento Pattern)【行為型】

his 備忘錄 很好 car 人的 成功率 構圖 設計模式的 就會 原文:C#設計模式之二十二備忘錄模式(Memento Pattern)【行為型】一、引言 今天我們開始講“行為型”設計模式的第十個模式,該模式是【備忘錄模式】,英文名稱是:Memento Pattern

檢測到在集成的托管管道模式下不適用的ASP.NET設置的解決方法(轉載)

blank span 轉載 sdn 是我 module error conf str 我們將ASP.NET程序從IIS6移植到IIS7,可能運行提示以下錯誤: HTTP 錯誤 500.23 - Internal Server Error 檢測到在集成的托管管道模式下不適