golang 併發設計模式(二)--管道模式1
本文摘錄了許式偉 《Go,基於連線與組合的語言》部分內容,為了便於理解,我在其後端寫了個完整的示例程式幫助理解,這篇文章 一是展示go在並行程式設計中的偉大,也是理解和學習閉包的活的教科書
---------------------------------------------------------------------------------------------------------------------------------------------------------------------
讓我們從Unix談起。Go語言與Unix、C語言有著極深的淵源。Go語言的領袖們參與甚至主導了Unix和C語言的設計。Ken Thompson 甚至算得上Unix和C語言的鼻祖。Go語言亦深受Unix和C語言的設計哲學影響。
在Unix世界裡,元件就是應用程式(app),每個app可大體抽象為:
- 輸入:stdin(標準輸入), params(命令列引數)
- 輸出:stdout(標準輸出)
- 協議:text (data stream)
不同的應用程式(app)如何連線?答案是:管道(pipeline)。在Unix世界中大家對這樣的東西已經很熟悉了:
app1 params1 | app2 params2
通過管道(pipeline),可以將一個應用程式的輸出(stdout)轉換為另一個應用程式的輸入(stdin)。更為神奇的一點,是這些應用程式是並行執行的。app1每產生一段輸出,立即會被app2所處理。所以管道(pipeline)稱得上是最古老,同時也是極其優秀的並行設施,簡單而強大。
需要注意的是,Unix世界中不同應用程式直接是鬆散耦合的。上游app的輸出是xml還是json,下游app需要知曉,但並無任何強制的約束。同一輸出,不同的下游app,對協議的理解甚至都可能並不相同。例如,上游app輸出一段xml文字,對於某個下游app來說,是一顆dom樹,但對linecount程式來說只是一個多行的文字,對於英文單詞詞頻統計程式來說,是一篇英文文章。
為了方便理解,我們先嚐試在Go語言中模擬整個Unix的管道(pipeline)機制。首先是應用程式(app),我們抽象為:
func(in io.Reader, out io.Writer, args []string)
我們按下圖來對應Unix與Go程式碼的關係:
也就是說,Unix 中的
app1 params1 | app2 params2
對應Go語言中是:
pipe( bind(app1, params1), bind(app2, params2) )
其中,bind 函式實現如下:
func bind( app func(in io.Reader, out io.Writer, args []string), args []string ) func(in io.Reader, out io.Writer) { return func(in io.Reader, out io.Writer) { app(in, out, args) } }
要理解bind函式,需要先理解“閉包”。Go語言中,應用程式以一個閉包的形式體現。如果你熟悉函數語言程式設計,不難發現,這個bind函式其實就是所謂的柯里化(currying)。
pipe函式如下:
func pipe( app1 func(in io.Reader, out io.Writer), app2 func(in io.Reader, out io.Writer) ) func(in io.Reader, out io.Writer) { return func(in io.Reader, out io.Writer) { pr, pw := io.Pipe() defer pw.Close() go func() { defer pr.Close() app2(pr, out) }() app1(in, pw) } }
要理解pipe函式,除了“閉包”外,需要知曉defer關鍵字和goroutine(go關鍵字)。defer語句會在函式退出時執行(無論是否發生了異常),通常用於資源的清理操作(比如關閉檔案控制代碼等)。有了defer語句,Go語言中的錯誤處理程式碼顯得非常優雅。在一個正常的函式呼叫前加上go關鍵字,就會使得該函式在新的goroutine中並行執行。理解了這些背景,這個pipe函式不難理解,無非是:先建立一個管道,讓app1讀入資料(in),並向管道的寫入端(pw)輸出,啟動一個新goroutine,讓app2從管道的讀入端讀取資料,並將處理結果輸出(out)。這樣得到的app就是app1和app2的組合了。
你甚至可以對多個app進行組合:
func pipe(apps ...func(in io.Reader, out io.Writer)) func(in io.Reader, out io.Writer) { if len(apps) == 0 { return nil } app := apps[0] for i := 1; i < len(apps); i++ { app1, app2 := app, apps[i] app = func(in io.Reader, out io.Writer) { pr, pw := io.Pipe() defer pw.Close() go func() { defer pr.Close() app2(pr, out) }() app1(in, pw) } } return app }
我們舉個比較實際的例子,假設我們有2個應用程式tar(打包)、gzip(壓縮):
-
func tar(io.Reader, out io.Writer, files []string)
-
func gzip(in io.Reader, out io.Writer)
那麼打包並壓縮的程式碼是:
pipe( bind(tar, files), gzip )(nil, out)
通過對管道(pipeline)的模擬我們可以看出,Go語言對並行支援是非常強大的,這主要得益於Go的輕量級程序(goroutine)。
例項程式,幫助理解管道:
package main
import (
"io"
"os"
"bufio"
"bytes"
"fmt"
"strconv"
)
//bind函式主要是用來為pipe函式整合用的,通過將閉包將函式簽名變成pipe所需的樣子
//返回一個函式閉包,將一個函式字面量app和字串slice 傳入其中
func bind(app func(in io.Reader, out io.Writer, args []string), args []string) func(in io.Reader, out io.Writer) {
return func(in io.Reader, out io.Writer) {
app(in, out, args)
}
}
//將兩個函式插入到管道的中間,呼叫者只需呼叫pipe返回的函式字面量,並傳入管道的首尾兩端,即可實現管道
//返回一個新的函式閉包
func pipe(app1 func(in io.Reader, out io.Writer), app2 func(in io.Reader, out io.Writer)) func(in io.Reader, out io.Writer) {
return func(in io.Reader, out io.Writer) {
pr, pw := io.Pipe()
defer pw.Close()
go func() {
defer pr.Close()
app2(pr, out)
}()
app1(in, pw)
}
}
//讀取args slice的每個字串,將其作為檔名,讀取檔案,並在檔案的每一行首部加上行號,寫入到out中
//此處in沒有使用到,主要是為了保證管道定義的一致性
func app1(in io.Reader, out io.Writer, args []string) {
for _, v := range args {
//fmt.Println(v)
file, err := os.Open(v)
if err != nil {
continue
}
defer file.Close()
buf := bufio.NewReader(file)
for i:=1; ;i++{
line, err := buf.ReadBytes('\n')
if err != nil {
break
}
linenum := strconv.Itoa(i)
nline := []byte(linenum + " ")
nline = append(nline, line...)
out.Write(nline)
}
}
}
//app2 主要是將位元組流轉化為大寫,中文可能會有點問題,不過主要是演示用,重在理解思想
//read from in, convert byte to Upper ,write the result to out
func app2(in io.Reader, out io.Writer) {
rd := bufio.NewReader(in)
p := make([]byte, 10)
for {
n, _ := rd.Read(p)
if n == 0 {
break
}
t := bytes.ToUpper(p[:n])
out.Write(t)
}
}
func main() {
args := os.Args[1:]
for _, v := range args {
fmt.Println(v)
}
p := pipe(bind(app1, args), app2)
p(os.Stdin, os.Stdout)
}