1. 程式人生 > >golang標準庫 context的使用

golang標準庫 context的使用

可能 book 退出 就是 固然 sele pri type 但是

本文索引

  • 問題引入
  • context包簡介
  • 示例

問題引入

goroutine為我們提供了輕量級的並發實現,作為golang最大的亮點之一更是備受推崇。

goroutine的簡單固然有利於我們的開發,但簡單總是有代價的,考慮如下例子:

func httpDo(req *http.Request, resp *http.Response) {
  for {
    select {
    case <-time.After(5 * time.Second):
      // 從req讀取數據然後發送給resp

    // 其他的一些邏輯(如果有的話)
    }
  }
}

func startListener() {
  // start http listener
  for {
    req, resp := HTTPListener.Accept()
    go httpDo(req, resp)
  }
}

上面的例子中,goroutinehttpDo每隔5秒讀取一次請求數據並發送給響應鏈接,startListener則每收到一個請求就啟動一個goroutine去處理,雖然是偽代碼,不過你已經發現了這是golang處理請求等並發任務時的慣用模型。

看著不是很簡單嗎,簡單而又強大。確實如此,但有一個小問題。假如我的startListener崩潰了或者需要重新啟動,這時前面那些鏈接都需要斷開重連,那麽我們應該怎麽停止那些goroutine呢?

答案是做不到。原因很簡單,當我們使用go func()啟動一個goroutine後,除了channelsync包中的同步手段之外,我們沒有任何可以控制goroutine的方法。簡單的說,除非goroutine在函數體內return或者主goroutine終止運行,否則我們是不能通過外部手段幹擾goroutine使其終止的。因此在上述例子中那些goroutine無法終止,這會造成goroutine leak。開頭已經說過,goroutine足夠輕量,通常對於一個函數體不是死循環的goroutine來說我們大可不必關心它的退出操作,然而對於例子中的goroutine來說它會持續運行下去,雖然每個goroutine只占用很少的資源,但如果數量足夠大的話被浪費的資源是相當驚人的,而一個長時間運行的程序必然因為得不到釋放的資源而出問題。更為致命的是這種leak的goroutine可能還會造成邏輯上的錯誤從而引發更嚴重的問題。

當然,一點簡單的改造就可以避免問題,這也是goroutine的強大之處。前面我們提到channel等同步手段可以間接地控制goroutine,所以我們可以利用一個空chan來達到終止所有goroutine的目的:

func httpDo(req *http.Request, resp *http.Response, done <-chan struct{}) {
  for {
    select {
    case <-done:
      // 避免goroutine leak
      return
    case <-time.After(5 * time.Second):
      // 從req讀取數據然後發送給resp

    // 其他的一些邏輯(如果有的話)
    }
  }
}

func startListener() {
  // start http listener

  done := make(chan struct{})
  defer close(done)
  for {
    req, resp := HTTPListener.Accept()
    go httpDo(req, resp, done)
  }
}

修改過的程序我們使用一個chan struct{}變量進行控制,當startListener退出時(無論正常結束還是panic)done都會關閉,關閉後的chan會返回對應類型0值,於是goroutine的select會收到done關閉的信號,隨後跟著退出,goroutine leak被避免。

當然,這麽做不夠優雅,畢竟當startListener這樣的函數增多後我們不得不每次都寫大量重復的代碼,這樣會讓開發變得乏味。

所以golang1.7引入了context包用來優雅地退出goroutine。

context包簡介

golang為了實現優雅地退出goroutine,在1.7引入了context。雖然名字叫“上下文”(context)不過其實只是我們在上一節例子的包裝。

context.Context是一個接口:

type Context interface {
    // 返回超時時間(duration加上創建context對象時的時間),如果已經超時ok為true
    // 返回的時間也可以是自己設置的time.Time
    Deadline() (deadline time.Time, ok bool)

    // done信號,和上一節的做法一樣,這裏進行了一些包裝
    Done() <-chan struct{}

    // 如果Done未被關閉就返回nil。
    // 否則返回相應的錯誤,比如調用了cancel()會返回Canceled;超時會返回DeadlineExceeded
    Err() error

    // 可以給context設置一些值,使用方法和map類似,key需要支持==比較操作,value需要是並發安全的
    Value(key interface{}) interface{}
}

實現了Context接口的對象都是並發安全的(如果你自己實現了這個接口也必須確保並發安全)。

context的使用很簡單,首先在需要產生goroutine的函數中創建一個context對象,然後將其作為goroutine的第一個參數傳入,例如go func(ctx context.Context) {} (ctx),如果在goroutine裏還會運行新的goroutine,那麽就繼續傳遞這個context對象。

如此一來最初的那個context對象就被稱為parent, 其余goroutine中的被稱為關聯context,通過這種關系我們就可以把相關的goroutine聯系在一起。

對於一個作為parent的context對象來說它也必須基於一個parent來創建,所以context提供了兩個創建空context的函數:

func Background() Context
func TODO() Context

兩者都返回一個空context,一個context不會被取消(cancel),也不會超時。它們唯一的區別是TODO表示你的代碼正在準備使用context但仍然需要一些調整,這回告訴靜態代碼分析工具go vet不匯報某些context的使用錯誤,而通常我們應該使用Background產生的context來創建我們自己的context對象。

有了parent之後就可以創建我們需要的context對象了,context包提供了三種context,分別是是普通context,超時context以及帶值的context:

// 普通context,通常這樣調用ctx, cancel := context.WithCancel(context.Background())
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

// 帶超時的context,超時之後會自動close對象的Done,與調用CancelFunc的效果一樣
// WithDeadline 明確地設置一個d指定的系統時鐘時間,如果超過就觸發超時
// WithTimeout 設置一個相對的超時時間,也就是deadline設為timeout加上當前的系統時間
// 因為兩者事實上都依賴於系統時鐘,所以可能存在微小的誤差,所以官方不推薦把超時間隔設置得太小
// 通常這樣調用ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

// 帶有值的context,沒有CancelFunc,所以它只用於值的多goroutine傳遞和共享
// 通常這樣調用ctx := context.WithValue(context.Background(), "key", myValue)
func WithValue(parent Context, key, val interface{}) Context

對於會返回CancelFunc的函數,我們必須要使用defer cancel(),否則靜態檢查例如go vet會報錯,理由是因為如果不用defer來終止context的話不能避免goroutine leak,對於帶有超時的context來說cancel還可以停止計時器釋放對應的資源。另外多次調用cancel是無害的,所以及時一個context因為超時而被取消,你依然可以對其使用cancel。所以我們應該把cancel的調用放在defer語句中。

上面是在主goroutine中的處理,對於傳入context的goroutine來說需要做一些結構上的改變:

func coroutine(ctx context.Context, data <-chan int) {
  // setup something
  for {
    select {
    case <-ctx.Done():
      // 一些清理操作
      return
    case i := <-data:
      go handle(ctx, i)
    }
  }
}

可以看見goroutine的主要邏輯結構需要由select包裹,首先檢查本次任務有沒有取消,沒有取消或者超時就從chan裏讀取數據進行處理,如果需要啟動其他goroutine就把ctx傳遞下去。

golang的初學者可能會對這段代碼產生不少疑惑,但是等熟悉了goroutine+chan的使用後就會發現這只是對既有模型的微調,十分便於遷移和修改。

示例

雖然說了這麽多,實際上還都是些很抽象的概念,所以這一節舉幾個例子輔助理解。

首先是使用超時context的例子,每個goroutine運行5秒,每隔一秒打印一段信息,5秒後終止運行:

func coroutine(ctx context.Context, duration time.Duration, id int, wg *sync.WaitGroup) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("goroutine %d finish\n", id)
            wg.Done()
            return
        case <-time.After(duration):
            fmt.Printf("message from goroutine %d\n", id)
        }
    }
}

func main() {
    wg := &sync.WaitGroup{}
    ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
    defer cancel()

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go coroutine(ctx, 1 * time.Second, i, wg)
    }

    wg.Wait()
}

我們使用WaitGroup等待所有的goroutine執行完畢,在收到<-ctx.Done()的終止信號後使wg中需要等待的goroutine數量減一。因為context只負責取消goroutine,不負責等待goroutine運行,所以需要配合一點輔助手段。如果運行程序你會得到類似如下結果(不同環境運行結果可能不同):

message from goroutine 0
message from goroutine 2
message from goroutine 4
message from goroutine 3
message from goroutine 1
message from goroutine 2
message from goroutine 4
message from goroutine 0
message from goroutine 1
message from goroutine 3
message from goroutine 3
message from goroutine 0
message from goroutine 4
message from goroutine 2
message from goroutine 1
message from goroutine 0
message from goroutine 2
message from goroutine 4
message from goroutine 3
message from goroutine 1
goroutine 0 finish
goroutine 3 finish
goroutine 1 finish
goroutine 2 finish
goroutine 4 finish

上一個例子中示範了超時控制,下一個例子將會演示如何用普通context取消一個goroutine:

func main() {
    // gen是一個生成器,返回從1開始的遞增數字直到自身被取消
    gen := func(ctx context.Context) <-chan int {
        dst := make(chan int)
        n := 1
        go func() {
            for {
                select {
                case <-ctx.Done():
                    return
                case dst <- n:
                    n++
                }
            }
        }()
        return dst
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    for n := range gen(ctx) {
        fmt.Println(n)
        // 生成到5時終止生成器運行
        if n == 5 {
            break
        }
    }
}

運行結果將會輸出1-5的數字,當生成5之後for循環終止,main退出前defer語句生效,終止goroutine的運行。

最後一個例子是如何在goroutine間共享變量的。

因為可能會被多個goroutine同時修改,所以我們的value必須保證並發安全,不過也可以換種思路,只要保證對value的操作是並發安全的就可以了:

func main() {
    var v int64
    wg := sync.WaitGroup{}
    ctx := context.WithValue(context.Background(), "myKey", &v)

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(ctx context.Context, key string) {
            // 取出來的是interface{},需要先斷言成我們需要的類型
            value := ctx.Value(key).(*int64)
            // 原子操作,並發安全
            atomic.AddInt64(value, 1)
            wg.Done()
        }(ctx, "myKey")
    }

    wg.Wait()
    // 類型斷言成*int64然後解引用
    fmt.Println(*(ctx.Value("myKey").(*int64)))
}

運行結果會打印出10,因為有10個goroutine分別對v原子地加了一。

當然,引入類型斷言後代碼復雜度有所提升,但數據的共享卻方便了,你可以基於帶值的context為parent繼續構建可以取消或超時的context,同時可以在其中分發數據而無需將其作為參數傳遞。

context包的使用就是這麽簡單,還有更多對於context的應用,這裏就不一一列舉了,希望各位讀者在以後的開發中能夠多加利用context包,寫出健壯的更優雅的代碼。

參考

context包官方文檔
官方博客的介紹

golang標準庫 context的使用