1. 程式人生 > >golang 併發設計模式(二)--管道模式1

golang 併發設計模式(二)--管道模式1

本文摘錄了許式偉 《Go,基於連線與組合的語言》部分內容,為了便於理解,我在其後端寫了個完整的示例程式幫助理解,這篇文章 一是展示go在並行程式設計中的偉大,也是理解和學習閉包的活的教科書

---------------------------------------------------------------------------------------------------------------------------------------------------------------------

讓我們從Unix談起。Go語言與Unix、C語言有著極深的淵源。Go語言的領袖們參與甚至主導了Unix和C語言的設計。Ken Thompson 甚至算得上Unix和C語言的鼻祖。Go語言亦深受Unix和C語言的設計哲學影響。

在Unix世界裡,元件就是應用程式(app),每個app可大體抽象為:

  • 輸入:stdin(標準輸入), params(命令列引數)
  • 輸出:stdout(標準輸出)
  • 協議:text (data stream)

不同的應用程式(app)如何連線?答案是:管道(pipeline)。在Unix世界中大家對這樣的東西已經很熟悉了:

app1 params1 | app2 params2

通過管道(pipeline),可以將一個應用程式的輸出(stdout)轉換為另一個應用程式的輸入(stdin)。更為神奇的一點,是這些應用程式是並行執行的。app1每產生一段輸出,立即會被app2所處理。所以管道(pipeline)稱得上是最古老,同時也是極其優秀的並行設施,簡單而強大。

需要注意的是,Unix世界中不同應用程式直接是鬆散耦合的。上游app的輸出是xml還是json,下游app需要知曉,但並無任何強制的約束。同一輸出,不同的下游app,對協議的理解甚至都可能並不相同。例如,上游app輸出一段xml文字,對於某個下游app來說,是一顆dom樹,但對linecount程式來說只是一個多行的文字,對於英文單詞詞頻統計程式來說,是一篇英文文章。

為了方便理解,我們先嚐試在Go語言中模擬整個Unix的管道(pipeline)機制。首先是應用程式(app),我們抽象為:

func(in io.Reader, out io.Writer, args []string)

我們按下圖來對應Unix與Go程式碼的關係:

也就是說,Unix 中的

app1 params1 | app2 params2

對應Go語言中是:

pipe( bind(app1, params1), bind(app2, params2) )

其中,bind 函式實現如下:

func bind(
    app func(in io.Reader, out io.Writer, args []string),
    args []string
) func(in io.Reader, out io.Writer) {

    return func(in io.Reader, out io.Writer) {
        app(in, out, args)
    }
}

要理解bind函式,需要先理解“閉包”。Go語言中,應用程式以一個閉包的形式體現。如果你熟悉函數語言程式設計,不難發現,這個bind函式其實就是所謂的柯里化(currying)。

pipe函式如下:

func pipe(
    app1 func(in io.Reader, out io.Writer),
    app2 func(in io.Reader, out io.Writer)
) func(in io.Reader, out io.Writer) {

    return func(in io.Reader, out io.Writer) {
        pr, pw := io.Pipe()
        defer pw.Close()
        go func() {
            defer pr.Close()
            app2(pr, out)
        }()
        app1(in, pw)
    }
}

要理解pipe函式,除了“閉包”外,需要知曉defer關鍵字和goroutine(go關鍵字)。defer語句會在函式退出時執行(無論是否發生了異常),通常用於資源的清理操作(比如關閉檔案控制代碼等)。有了defer語句,Go語言中的錯誤處理程式碼顯得非常優雅。在一個正常的函式呼叫前加上go關鍵字,就會使得該函式在新的goroutine中並行執行。理解了這些背景,這個pipe函式不難理解,無非是:先建立一個管道,讓app1讀入資料(in),並向管道的寫入端(pw)輸出,啟動一個新goroutine,讓app2從管道的讀入端讀取資料,並將處理結果輸出(out)。這樣得到的app就是app1和app2的組合了。

你甚至可以對多個app進行組合:

func pipe(apps ...func(in io.Reader, out io.Writer)) func(in io.Reader, out io.Writer) {

    if len(apps) == 0 { return nil }
    app := apps[0]
    for i := 1; i < len(apps); i++ {
        app1, app2 := app, apps[i]
        app = func(in io.Reader, out io.Writer) {
            pr, pw := io.Pipe()
            defer pw.Close()
            go func() {
                defer pr.Close()
                app2(pr, out)
            }()
            app1(in, pw)
        }
    }
    return app
}

我們舉個比較實際的例子,假設我們有2個應用程式tar(打包)、gzip(壓縮):

  • func tar(io.Reader, out io.Writer, files []string)
  • func gzip(in io.Reader, out io.Writer)

那麼打包並壓縮的程式碼是:

pipe( bind(tar, files), gzip )(nil, out)

通過對管道(pipeline)的模擬我們可以看出,Go語言對並行支援是非常強大的,這主要得益於Go的輕量級程序(goroutine)。

例項程式,幫助理解管道:

package main

import (
	"io"
	"os"
	"bufio"
	"bytes"
	"fmt"
	"strconv"
)

//bind函式主要是用來為pipe函式整合用的,通過將閉包將函式簽名變成pipe所需的樣子
//返回一個函式閉包,將一個函式字面量app和字串slice 傳入其中
func bind(app func(in io.Reader, out io.Writer, args []string), args []string) func(in io.Reader, out io.Writer) {
	return func(in io.Reader, out io.Writer) {
		app(in, out, args)
	}
}

//將兩個函式插入到管道的中間,呼叫者只需呼叫pipe返回的函式字面量,並傳入管道的首尾兩端,即可實現管道
//返回一個新的函式閉包
func pipe(app1 func(in io.Reader, out io.Writer), app2 func(in io.Reader, out io.Writer)) func(in io.Reader, out io.Writer) {
	return func(in io.Reader, out io.Writer) {
		pr, pw := io.Pipe()
		defer pw.Close()
		go func() {
			defer pr.Close()
			app2(pr, out)
		}()
		app1(in, pw)
	}
}

//讀取args slice的每個字串,將其作為檔名,讀取檔案,並在檔案的每一行首部加上行號,寫入到out中
//此處in沒有使用到,主要是為了保證管道定義的一致性
func app1(in io.Reader, out io.Writer, args []string) {
	for _, v := range args {
		//fmt.Println(v)
		file, err := os.Open(v)
		if err != nil {
			continue
		}
		defer file.Close()
		buf := bufio.NewReader(file)
		for i:=1; ;i++{
			line, err := buf.ReadBytes('\n')
			if err != nil {
				break
			}
			linenum := strconv.Itoa(i)
			nline := []byte(linenum + " ")
			nline = append(nline, line...)
			out.Write(nline)
		}
	}
}

//app2 主要是將位元組流轉化為大寫,中文可能會有點問題,不過主要是演示用,重在理解思想
//read from in, convert byte to Upper ,write the result to out
func app2(in io.Reader, out io.Writer) {
	rd := bufio.NewReader(in)
	p := make([]byte, 10)
	for {
		n, _ := rd.Read(p)
		if n == 0 {
			break
		}
		t := bytes.ToUpper(p[:n])
		out.Write(t)
	}
}

func main() {
	args := os.Args[1:]
	for _, v := range args {
		fmt.Println(v)
	}
    p := pipe(bind(app1, args), app2)
	p(os.Stdin, os.Stdout)
}