golang中的標準庫context
在 Go http包的Server中,每一個請求在都有一個對應的 goroutine 去處理。請求處理函式通常會啟動額外的 goroutine 用來訪問後端服務,比如資料庫和RPC服務。用來處理一個請求的 goroutine 通常需要訪問一些與請求特定的資料,比如終端使用者的身份認證資訊、驗證相關的token、請求的截止時間。 當一個請求被取消或超時時,所有用來處理該請求的 goroutine 都應該迅速退出,然後系統才能釋放這些 goroutine 佔用的資源。
為什麼需要Context
- 基本示例
var wg sync.WaitGroup func main() { wg.Add(1) go worker() wg.Wait() } func worker() { for { fmt.Println("worker 執行了") time.Sleep(time.Second) } wg.Done() }
全域性變數方式
var wg sync.WaitGroup var exit bool // 全域性變數方式存在的問題: // 1. 使用全域性變數在跨包呼叫時不容易統一 // 2. 如果worker中再啟動goroutine,就不太好控制了 func main() { wg.Add(1) go worker() time.Sleep(time.Second * 3) // 等待3秒,以免程式過快退出 exit = true // 修改全域性變數實現子goroutine的退出 wg.Wait() // 主go程等待子go程結束 } func worker() { for { fmt.Println("worker 執行了") time.Sleep(time.Second) if exit { break } } wg.Done() }
通道方式
var wg sync.WaitGroup var ch = make(chan string) // 管道方式存在的問題: // 1. 使用全域性變數在跨包呼叫時不容易實現規範和統一,需要維護一個共用的channel func main() { wg.Add(1) go worker() time.Sleep(time.Second * 3) // 等待3秒,以免程式過快退出 ch <- "exit" close(ch) wg.Wait() // 主go程等待子go程結束 } func worker() { loop: for { fmt.Println("worker 執行了") time.Sleep(time.Second) select { case <-ch: break loop default: } } wg.Done() }
官方版的方案
var wg sync.WaitGroup
func main() {
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go worker(ctx)
time.Sleep(time.Second * 3) // 等待3秒,以免程式過快退出
cancel() // 通知goroutine結束
wg.Wait()
}
func worker(ctx context.Context) {
loop:
for {
fmt.Println("worker 執行了")
time.Sleep(time.Second)
select {
case <-ctx.Done(): // 等待上級通知
break loop
default:
}
}
wg.Done()
}
當子goroutine又開啟另外一個goroutine時,只需要將ctx傳入即可:
var wg sync.WaitGroup
func main() {
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go worker(ctx)
time.Sleep(time.Second * 3) // 等待3秒,以免程式過快退出
cancel() // 通知goroutine結束
wg.Wait()
}
func worker(ctx context.Context) {
go worker2(ctx)
loop:
for {
fmt.Println("worker 執行了")
time.Sleep(time.Second)
select {
case <-ctx.Done(): // 等待上級通知
break loop
default:
}
}
wg.Done()
}
func worker2(ctx context.Context) {
loop:
for {
fmt.Println("workder2 gagaga")
time.Sleep(time.Second * 1)
select {
case <-ctx.Done(): // 等待上級通知
break loop
default:
}
}
}
context初識
Go1.7加入了一個新的標準庫context,它定義了Context型別,專門用來簡化 對於處理單個請求的多個 goroutine 之間與請求域的資料、取消訊號、截止時間等相關操作,這些操作可能涉及多個 API 呼叫。
對伺服器傳入的請求應該建立上下文,而對伺服器的傳出呼叫應該接受上下文。它們之間的函式呼叫鏈必須傳遞上下文,或者可以使用WithCancel、WithDeadline、WithTimeout或WithValue建立的派生上下文。當一個上下文被取消時,它派生的所有上下文也被取消。
Context介面
context.Context是一個介面,該介面定義了四個需要實現的方法。具體簽名如下:
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
其中:
Deadline方法需要返回當前Context被取消的時間,也就是完成工作的截止時間(deadline);
Done方法需要返回一個Channel,這個Channel會在當前工作完成或者上下文被取消之後關閉,多次呼叫Done方法會返回同一個Channel;
Err方法會返回當前Context結束的原因,它只會在Done返回的Channel被關閉時才會返回非空的值;
如果當前Context被取消就會返回Canceled錯誤;
如果當前Context超時就會返回DeadlineExceeded錯誤;
Value方法會從Context中返回鍵對應的值,對於同一個上下文來說,多次呼叫Value 並傳入相同的Key會返回相同的結果,該方法僅用於傳遞跨API和程序間跟請求域的資料;
Background()和TODO()
Go內建兩個函式:Background()和TODO(),這兩個函式分別返回一個實現了Context介面的background和todo。我們程式碼中最開始都是以這兩個內建的上下文物件作為最頂層的partent context,衍生出更多的子上下文物件。
Background()主要用於main函式、初始化以及測試程式碼中,作為Context這個樹結構的最頂層的Context,也就是根Context。
TODO(),它目前還不知道具體的使用場景,如果我們不知道該使用什麼Context的時候,可以使用這個。
background和todo本質上都是emptyCtx結構體型別,是一個不可取消,沒有設定截止時間,沒有攜帶任何值的Context。
With系列函式
此外,context包中還定義了四個With系列函式。
WithCancel
WithCancel的函式簽名如下:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
WithCancel返回帶有新Done通道的父節點的副本。當呼叫返回的cancel函式或當關閉父上下文的Done通道時,將關閉返回上下文的Done通道,無論先發生什麼情況。
取消此上下文將釋放與其關聯的資源,因此程式碼應該在此上下文中執行的操作完成後立即呼叫cancel。
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // 當我們取完需要的整數後,呼叫cancel
for n := range gen(ctx) {
fmt.Println(n)
if n == 5 {
break
}
}
}
func gen(ctx context.Context) <-chan int { // 返回只讀通道
dst := make(chan int)
n := 1
go func() {
for {
select {
case <-ctx.Done():
return // 結束該goroutine,防止洩漏
case dst <- n: // 將n數字寫入到通道中去
n++
}
}
}()
return dst
}
上面的示例程式碼中,gen函式在單獨的goroutine中生成整數並將它們傳送到返回的通道。 gen的呼叫者在使用生成的整數之後需要取消上下文,以免gen啟動的內部goroutine發生洩漏。
WithDeadline
WithDeadline的函式簽名如下:
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
返回父上下文的副本,並將deadline調整為不遲於d。如果父上下文的deadline已經早於d,則WithDeadline(parent, d)在語義上等同於父上下文。當截止日過期時,當呼叫返回的cancel函式時,或者當父上下文的Done通道關閉時,返回上下文的Done通道將被關閉,以最先發生的情況為準。
取消此上下文將釋放與其關聯的資源,因此程式碼應該在此上下文中執行的操作完成後立即呼叫cancel。
func main() {
d := time.Now().Add(50 * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)
// 儘管ctx會過期,但在任何情況下呼叫它的cancel函式都是很好的實踐。
// 如果不這樣做,可能會使上下文及其父類存活的時間超過必要的時間。
defer cancel()
select {
case <-time.After(time.Second):
fmt.Println("overSleep")
case <-ctx.Done():
fmt.Println(ctx.Err())
}
}
上面的程式碼中,定義了一個50毫秒之後過期的deadline,然後我們呼叫context.WithDeadline(context.Background(), d)得到一個上下文(ctx)和一個取消函式(cancel),然後使用一個select讓主程式陷入等待:等待1秒後列印overslept退出或者等待ctx過期後退出。 因為ctx50秒後就過期,所以ctx.Done()會先接收到值,上面的程式碼會列印ctx.Err()取消原因。
WithTimeout
WithTimeout的函式簽名如下:
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
WithTimeout返回WithDeadline(parent, time.Now().Add(timeout))。
取消此上下文將釋放與其相關的資源,因此程式碼應該在此上下文中執行的操作完成後立即呼叫cancel,通常用於資料庫或者網路連線的超時控制。具體示例如下:
var wg sync.WaitGroup
func main() {
// 設定一個50毫秒的超時
ctx, cancel := context.WithTimeout(context.Background(), 50 * time.Millisecond)
wg.Add(1)
go worker(ctx)
time.Sleep(5 * time.Second)
cancel() // 通知子goroutine結束
wg.Wait()
fmt.Println("over")
}
func worker(ctx context.Context) {
loop:
for {
fmt.Println("db conncting")
time.Sleep(time.Millisecond * 10)
select {
case <-ctx.Done(): // 50毫秒後自動呼叫
break loop
default:
}
}
fmt.Println("worker done")
wg.Done()
}
WithValue
WithValue函式能夠將請求作用域的資料與 Context 物件建立關係。宣告如下:
func WithValue(parent Context, key, val interface{}) Context
WithValue返回父節點的副本,其中與key關聯的值為val。
僅對API和程序間傳遞請求域的資料使用上下文值,而不是使用它來傳遞可選引數給函式。
所提供的鍵必須是可比較的,並且不應該是string型別或任何其他內建型別,以避免使用上下文在包之間發生衝突。WithValue的使用者應該為鍵定義自己的型別。為了避免在分配給interface{}時進行分配,上下文鍵通常具有具體型別struct{}。或者,匯出的上下文關鍵變數的靜態型別應該是指標或介面。
var wg sync.WaitGroup
type TraceCode string
func main() {
// 設定一個50毫秒的超時
ctx, cancel := context.WithTimeout(context.Background(), 50 * time.Millisecond)
// 在系統的入口中設定trace code傳遞給後續啟動的goroutine實現日誌資料聚合
ctx = context.WithValue(ctx, TraceCode("trace_code"), "123456789")
wg.Add(1)
go worker(ctx)
time.Sleep(5 * time.Second)
cancel() // 通知子goroutine結束
wg.Wait()
fmt.Println("over")
}
func worker(ctx context.Context) {
key := TraceCode("trace_code")
traceCode, ok := ctx.Value(key).(string) // 在子goroutine中獲取trace_code
if !ok {
fmt.Println("invalid trace code")
}
loop:
for {
fmt.Println("worker trace code: ", traceCode)
time.Sleep(time.Millisecond * 10)
select {
case <-ctx.Done(): // 50毫秒後自動呼叫
break loop
default:
}
}
fmt.Println("worker done")
wg.Done()
}
使用context的注意事項
- 推薦以引數的方式顯示傳遞Context
- 以Context作為引數的函式方法,應該把Context作為第一個引數。
- 給一個函式方法傳遞Context的時候,不要傳遞nil,如果不知道傳遞什麼,就使用context.TODO()
- Context的Value相關方法應該傳遞請求域的必要資料,不應該用於傳遞可選引數
- Context是執行緒安全的,可以放心的在多個goroutine中傳遞
客戶端超時取消示例
呼叫服務端API時如何在客戶端實現超時控制?
server端
點選檢視程式碼
// context_timeout/server/main.go
package main
import (
"fmt"
"math/rand"
"net/http"
"time"
)
// server端,隨機出現慢響應
func indexHandler(w http.ResponseWriter, r *http.Request) {
number := rand.Intn(2)
if number == 1 {
time.Sleep(time.Second * 10) // 耗時10秒的慢響應
fmt.Fprintf(w, "slow response")
return
}
fmt.Fprint(w, "quick response")
}
func main() {
http.HandleFunc("/", indexHandler)
err := http.ListenAndServe(":8000", nil)
if err != nil {
panic(err)
}
}
client端
點選檢視程式碼
``` // context_timeout/client/main.go package mainimport (
"context"
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
)
// 客戶端
type respData struct {
resp *http.Response
err error
}
func doCall(ctx context.Context) {
transport := http.Transport{
// 請求頻繁可定義全域性的client物件並啟用長連結
// 請求不頻繁使用短連結
DisableKeepAlives: true,
}
client := http.Client{
Transport: &transport,
}
respChan := make(chan *respData, 1)
req, err := http.NewRequest("GET", "http://127.0.0.1:8000/", nil)
if err != nil {
fmt.Printf("new requestg failed, err:%v\n", err)
return
}
req = req.WithContext(ctx) // 使用帶超時的ctx建立一個新的client request
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
go func() {
resp, err := client.Do(req)
fmt.Printf("client.do resp:%v, err:%v\n", resp, err)
rd := &respData{
resp: resp,
err: err,
}
respChan <- rd
wg.Done()
}()
select {
case <-ctx.Done():
//transport.CancelRequest(req)
fmt.Println("call api timeout")
case result := <-respChan:
fmt.Println("call server api success")
if result.err != nil {
fmt.Printf("call server api failed, err:%v\n", result.err)
return
}
defer result.resp.Body.Close()
data, _ := ioutil.ReadAll(result.resp.Body)
fmt.Printf("resp:%v\n", string(data))
}
}
func main() {
// 定義一個100毫秒的超時
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel() // 呼叫cancel釋放子goroutine資源
doCall(ctx)
}
</details>