深入理解Golang之context_轉
轉自:https://juejin.im/post/6844904070667321357
前言
這篇文章將介紹Golang
併發程式設計中常用到一種程式設計模式:context
。本文將從為什麼需要context
出發,深入瞭解context
的實現原理,以及瞭解如何使用context
。
為什麼需要context
在併發程式中,由於超時、取消操作或者一些異常情況,往往需要進行搶佔操作或者中斷後續操作。熟悉channel
的朋友應該都見過使用done channel
來處理此類問題。比如以下這個例子:
func main() { messages := make(chan int, 10) done := make(chan bool) defer close(messages) // consumer go func() { ticker := time.NewTicker(1 * time.Second) for _ = range ticker.C { select { case <-done: fmt.Println("child process interrupt...") return default: fmt.Printf("send message: %d\n", <-messages) } } }() // producer for i := 0; i < 10; i++ { messages <- i } time.Sleep(5 * time.Second) close(done) time.Sleep(1 * time.Second) fmt.Println("main process exit!") }
上述例子中定義了一個buffer
channel done
, 子協程執行著定時任務。如果主協程需要在某個時刻傳送訊息通知子協程中斷任務退出,那麼就可以讓子協程監聽這個done channel
,一旦主協程關閉done channel
,那麼子協程就可以推出了,這樣就實現了主協程通知子協程的需求。這很好,但是這也是有限的。
如果我們可以在簡單的通知上附加傳遞額外的資訊來控制取消:為什麼取消,或者有一個它必須要完成的最終期限,更或者有多個取消選項,我們需要根據額外的資訊來判斷選擇執行哪個取消選項。
考慮下面這種情況:假如主協程中有多個任務1, 2, …m,主協程對這些任務有超時控制;而其中任務1又有多個子任務1, 2, …n,任務1對這些子任務也有自己的超時控制,那麼這些子任務既要感知主協程的取消訊號,也需要感知任務1的取消訊號。
如果還是使用done channel
的用法,我們需要定義兩個done channel
,子任務們需要同時監聽這兩個done channel
。嗯,這樣其實好像也還行哈。但是如果層級更深,如果這些子任務還有子任務,那麼使用done channel
的方式將會變得非常繁瑣且混亂。
我們需要一種優雅的方案來實現這樣一種機制:
- 上層任務取消後,所有的下層任務都會被取消;
- 中間某一層的任務取消後,只會將當前任務的下層任務取消,而不會影響上層的任務以及同級任務。
這個時候context
就派上用場了。我們首先看看context
的結構設計和實現原理。
context是什麼
context介面
先看Context
介面結構,看起來非常簡單。
type Context interface { Deadline() (deadline time.Time, ok bool) Done() <-chan struct{} Err() error Value(key interface{}) interface{} }
Context
介面包含四個方法:
Deadline
返回綁定當前context
的任務被取消的截止時間;如果沒有設定期限,將返回ok == false
。Done
當綁定當前context
的任務被取消時,將返回一個關閉的channel
;如果當前context
不會被取消,將返回nil
。Err
如果Done
返回的channel
沒有關閉,將返回nil
;如果Done
返回的channel
已經關閉,將返回非空的值表示任務結束的原因。如果是context
被取消,Err
將返回Canceled
;如果是context
超時,Err
將返回DeadlineExceeded
。Value
返回context
儲存的鍵值對中當前key
對應的值,如果沒有對應的key
,則返回nil
。
可以看到Done
方法返回的channel
正是用來傳遞結束訊號以搶佔並中斷當前任務;Deadline
方法指示一段時間後當前goroutine
是否會被取消;以及一個Err
方法,來解釋goroutine
被取消的原因;而Value
則用於獲取特定於當前任務樹的額外資訊。而context
所包含的額外資訊鍵值對是如何儲存的呢?其實可以想象一顆樹,樹的每個節點可能攜帶一組鍵值對,如果當前節點上無法找到key
所對應的值,就會向上去父節點裡找,直到根節點,具體後面會說到。
再來看看context
包中的其他關鍵內容。
emptyCtx
emptyCtx
是一個int
型別的變數,但實現了context
的介面。emptyCtx
沒有超時時間,不能取消,也不能儲存任何額外資訊,所以emptyCtx
用來作為context
樹的根節點。
// An emptyCtx is never canceled, has no values, and has no deadline. It is not // struct{}, since vars of this type must have distinct addresses. type emptyCtx int func (*emptyCtx) Deadline() (deadline time.Time, ok bool) { return } func (*emptyCtx) Done() <-chan struct{} { return nil } func (*emptyCtx) Err() error { return nil } func (*emptyCtx) Value(key interface{}) interface{} { return nil } func (e *emptyCtx) String() string { switch e { case background: return "context.Background" case todo: return "context.TODO" } return "unknown empty Context" } var ( background = new(emptyCtx) todo = new(emptyCtx) ) func Background() Context { return background } func TODO() Context { return todo }
但我們一般不會直接使用emptyCtx
,而是使用由emptyCtx
例項化的兩個變數,分別可以通過呼叫Background
和TODO
方法得到,但這兩個context
在實現上是一樣的。那麼Background
和TODO
方法得到的context
有什麼區別呢?可以看一下官方的解釋:
// Background returns a non-nil, empty Context. It is never canceled, has no // values, and has no deadline. It is typically used by the main function, // initialization, and tests, and as the top-level Context for incoming // requests. // TODO returns a non-nil, empty Context. Code should use context.TODO when // it's unclear which Context to use or it is not yet available (because the // surrounding function has not yet been extended to accept a Context // parameter).
Background
和TODO
只是用於不同場景下:Background
通常被用於主函式、初始化以及測試中,作為一個頂層的context
,也就是說一般我們建立的context
都是基於Background
;而TODO
是在不確定使用什麼context
的時候才會使用。
下面將介紹兩種不同功能的基礎context
型別:valueCtx
和cancelCtx
。
valueCtx
valueCtx結構體
type valueCtx struct { Context key, val interface{} } func (c *valueCtx) Value(key interface{}) interface{} { if c.key == key { return c.val } return c.Context.Value(key) }
valueCtx
利用一個Context
型別的變數來表示父節點context
,所以當前context
繼承了父context
的所有資訊;valueCtx
型別還攜帶一組鍵值對,也就是說這種context
可以攜帶額外的資訊。valueCtx
實現了Value
方法,用以在context
鏈路上獲取key
對應的值,如果當前context
上不存在需要的key
,會沿著context
鏈向上尋找key
對應的值,直到根節點。
WithValue
WithValue
用以向context
新增鍵值對:
func WithValue(parent Context, key, val interface{}) Context { if key == nil { panic("nil key") } if !reflect.TypeOf(key).Comparable() { panic("key is not comparable") } return &valueCtx{parent, key, val} }
這裡新增鍵值對不是在原context
結構體上直接新增,而是以此context
作為父節點,重新建立一個新的valueCtx
子節點,將鍵值對新增在子節點上,由此形成一條context
鏈。獲取value
的過程就是在這條context
鏈上由尾部上前搜尋:
cancelCtx
cancelCtx結構體
type cancelCtx struct { Context mu sync.Mutex // protects following fields done chan struct{} // created lazily, closed by first cancel call children map[canceler]struct{} // set to nil by the first cancel call err error // set to non-nil by the first cancel call } type canceler interface { cancel(removeFromParent bool, err error) Done() <-chan struct{} }
跟valueCtx
類似,cancelCtx
中也有一個context
變數作為父節點;變數done
表示一個channel
,用來表示傳遞關閉訊號;children
表示一個map
,儲存了當前context
節點下的子節點;err
用於儲存錯誤資訊表示任務結束的原因。
再來看一下cancelCtx
實現的方法:
func (c *cancelCtx) Done() <-chan struct{} { c.mu.Lock() if c.done == nil { c.done = make(chan struct{}) } d := c.done c.mu.Unlock() return d } func (c *cancelCtx) Err() error { c.mu.Lock() err := c.err c.mu.Unlock() return err } func (c *cancelCtx) cancel(removeFromParent bool, err error) { if err == nil { panic("context: internal error: missing cancel error") } c.mu.Lock() if c.err != nil { c.mu.Unlock() return // already canceled } // 設定取消原因 c.err = err 設定一個關閉的channel或者將done channel關閉,用以傳送關閉訊號 if c.done == nil { c.done = closedchan } else { close(c.done) } // 將子節點context依次取消 for child := range c.children { // NOTE: acquiring the child's lock while holding parent's lock. child.cancel(false, err) } c.children = nil c.mu.Unlock() if removeFromParent { // 將當前context節點從父節點上移除 removeChild(c.Context, c) } }
可以發現cancelCtx
型別變數其實也是canceler
型別,因為cancelCtx
實現了canceler
介面。Done
方法和Err
方法沒必要說了,cancelCtx
型別的context
在呼叫cancel
方法時會設定取消原因,將done channel
設定為一個關閉channel
或者關閉channel
,然後將子節點context
依次取消,如果有需要還會將當前節點從父節點上移除。
WithCancel
WithCancel
函式用來建立一個可取消的context
,即cancelCtx
型別的context
。WithCancel
返回一個context
和一個CancelFunc
,呼叫CancelFunc
即可觸發cancel
操作。直接看原始碼:
type CancelFunc func() func WithCancel(parent Context) (ctx Context, cancel CancelFunc) { c := newCancelCtx(parent) propagateCancel(parent, &c) return &c, func() { c.cancel(true, Canceled) } } // newCancelCtx returns an initialized cancelCtx. func newCancelCtx(parent Context) cancelCtx { // 將parent作為父節點context生成一個新的子節點 return cancelCtx{Context: parent} } func propagateCancel(parent Context, child canceler) { if parent.Done() == nil { // parent.Done()返回nil表明父節點以上的路徑上沒有可取消的context return // parent is never canceled } // 獲取最近的型別為cancelCtx的祖先節點 if p, ok := parentCancelCtx(parent); ok { p.mu.Lock() if p.err != nil { // parent has already been canceled child.cancel(false, p.err) } else { if p.children == nil { p.children = make(map[canceler]struct{}) } // 將當前子節點加入最近cancelCtx祖先節點的children中 p.children[child] = struct{}{} } p.mu.Unlock() } else { go func() { select { case <-parent.Done(): child.cancel(false, parent.Err()) case <-child.Done(): } }() } } func parentCancelCtx(parent Context) (*cancelCtx, bool) { for { switch c := parent.(type) { case *cancelCtx: return c, true case *timerCtx: return &c.cancelCtx, true case *valueCtx: parent = c.Context default: return nil, false } } }
之前說到cancelCtx
取消時,會將後代節點中所有的cancelCtx
都取消,propagateCancel
即用來建立當前節點與祖先節點這個取消關聯邏輯。
- 如果
parent.Done()
返回nil
,表明父節點以上的路徑上沒有可取消的context
,不需要處理; - 如果在
context
鏈上找到到cancelCtx
型別的祖先節點,則判斷這個祖先節點是否已經取消,如果已經取消就取消當前節點;否則將當前節點加入到祖先節點的children
列表。 - 否則開啟一個協程,監聽
parent.Done()
和child.Done()
,一旦parent.Done()
返回的channel
關閉,即context
鏈中某個祖先節點context
被取消,則將當前context
也取消。
這裡或許有個疑問,為什麼是祖先節點而不是父節點?這是因為當前context
鏈可能是這樣的:
當前cancelCtx
的父節點context
並不是一個可取消的context
,也就沒法記錄children
。
timerCtx
timerCtx
是一種基於cancelCtx
的context
型別,從字面上就能看出,這是一種可以定時取消的context
。
type timerCtx struct { cancelCtx timer *time.Timer // Under cancelCtx.mu. deadline time.Time } func (c *timerCtx) Deadline() (deadline time.Time, ok bool) { return c.deadline, true } func (c *timerCtx) cancel(removeFromParent bool, err error) { 將內部的cancelCtx取消 c.cancelCtx.cancel(false, err) if removeFromParent { // Remove this timerCtx from its parent cancelCtx's children. removeChild(c.cancelCtx.Context, c) } c.mu.Lock() if c.timer != nil { 取消計時器 c.timer.Stop() c.timer = nil } c.mu.Unlock() }
timerCtx
內部使用cancelCtx
實現取消,另外使用定時器timer
和過期時間deadline
實現定時取消的功能。timerCtx
在呼叫cancel
方法,會先將內部的cancelCtx
取消,如果需要則將自己從cancelCtx
祖先節點上移除,最後取消計時器。
WithDeadline
WithDeadline
返回一個基於parent
的可取消的context
,並且其過期時間deadline
不晚於所設定時間d
。
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) { if cur, ok := parent.Deadline(); ok && cur.Before(d) { // The current deadline is already sooner than the new one. return WithCancel(parent) } c := &timerCtx{ cancelCtx: newCancelCtx(parent), deadline: d, } // 建立新建context與可取消context祖先節點的取消關聯關係 propagateCancel(parent, c) dur := time.Until(d) if dur <= 0 { c.cancel(true, DeadlineExceeded) // deadline has already passed return c, func() { c.cancel(false, Canceled) } } c.mu.Lock() defer c.mu.Unlock() if c.err == nil { c.timer = time.AfterFunc(dur, func() { c.cancel(true, DeadlineExceeded) }) } return c, func() { c.cancel(true, Canceled) } }
- 如果父節點
parent
有過期時間並且過期時間早於給定時間d
,那麼新建的子節點context
無需設定過期時間,使用WithCancel
建立一個可取消的context
即可; - 否則,就要利用
parent
和過期時間d
建立一個定時取消的timerCtx
,並建立新建context
與可取消context
祖先節點的取消關聯關係,接下來判斷當前時間距離過期時間d
的時長dur
:
- 如果
dur
小於0,即當前已經過了過期時間,則直接取消新建的timerCtx
,原因為DeadlineExceeded
; - 否則,為新建的
timerCtx
設定定時器,一旦到達過期時間即取消當前timerCtx
。
WithTimeout
與WithDeadline
類似,WithTimeout
也是建立一個定時取消的context
,只不過WithDeadline
是接收一個過期時間點,而WithTimeout
接收一個相對當前時間的過期時長timeout
:
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) { return WithDeadline(parent, time.Now().Add(timeout)) }
context的使用
首先使用context
實現文章開頭done channel
的例子來示範一下如何更優雅實現協程間取消訊號的同步:
func main() { messages := make(chan int, 10) // producer for i := 0; i < 10; i++ { messages <- i } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // consumer go func(ctx context.Context) { ticker := time.NewTicker(1 * time.Second) for _ = range ticker.C { select { case <-ctx.Done(): fmt.Println("child process interrupt...") return default: fmt.Printf("send message: %d\n", <-messages) } } }(ctx) defer close(messages) defer cancel() select { case <-ctx.Done(): time.Sleep(1 * time.Second) fmt.Println("main process exit!") } }
這個例子中,只要讓子執行緒監聽主執行緒傳入的ctx
,一旦ctx.Done()
返回空channel
,子執行緒即可取消執行任務。但這個例子還無法展現context
的傳遞取消資訊的強大優勢。
閱讀過net/http
包原始碼的朋友可能注意到在實現http server
時就用到了context
, 下面簡單分析一下。
1、首先Server
在開啟服務時會建立一個valueCtx
,儲存了server
的相關資訊,之後每建立一條連線就會開啟一個協程,並攜帶此valueCtx
。
func (srv *Server) Serve(l net.Listener) error { ... var tempDelay time.Duration // how long to sleep on accept failure baseCtx := context.Background() // base is always background, per Issue 16220 ctx := context.WithValue(baseCtx, ServerContextKey, srv) for { rw, e := l.Accept() ... tempDelay = 0 c := srv.newConn(rw) c.setState(c.rwc, StateNew) // before Serve can return go c.serve(ctx) } }
2、建立連線之後會基於傳入的context
建立一個valueCtx
用於儲存本地地址資訊,之後在此基礎上又建立了一個cancelCtx
,然後開始從當前連線中讀取網路請求,每當讀取到一個請求則會將該cancelCtx
傳入,用以傳遞取消訊號。一旦連線斷開,即可傳送取消訊號,取消所有進行中的網路請求。
func (c *conn) serve(ctx context.Context) { c.remoteAddr = c.rwc.RemoteAddr().String() ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr()) ... ctx, cancelCtx := context.WithCancel(ctx) c.cancelCtx = cancelCtx defer cancelCtx() ... for { w, err := c.readRequest(ctx) ... serverHandler{c.server}.ServeHTTP(w, w.req) ... } }
3、讀取到請求之後,會再次基於傳入的context
建立新的cancelCtx
,並設定到當前請求物件req
上,同時生成的response
物件中cancelCtx
儲存了當前context
取消方法。
func (c *conn) readRequest(ctx context.Context) (w *response, err error) { ... req, err := readRequest(c.bufr, keepHostHeader) ... ctx, cancelCtx := context.WithCancel(ctx) req.ctx = ctx ... w = &response{ conn: c, cancelCtx: cancelCtx, req: req, reqBody: req.Body, handlerHeader: make(Header), contentLength: -1, closeNotifyCh: make(chan bool, 1), // We populate these ahead of time so we're not // reading from req.Header after their Handler starts // and maybe mutates it (Issue 14940) wants10KeepAlive: req.wantsHttp10KeepAlive(), wantsClose: req.wantsClose(), } ... return w, nil }
這樣處理的目的主要有以下幾點:
-
一旦請求超時,即可中斷當前請求;
-
在處理構建
response
過程中如果發生錯誤,可直接呼叫response
物件的cancelCtx
方法結束當前請求; -
在處理構建
response
完成之後,呼叫response
物件的cancelCtx
方法結束當前請求。
在整個server
處理流程中,使用了一條context
鏈貫穿Server
、Connection
、Request
,不僅將上游的資訊共享給下游任務,同時實現了上游可傳送取消訊號取消所有下游任務,而下游任務自行取消不會影響上游任務。
總結
context
主要用於父子任務之間的同步取消訊號,本質上是一種協程排程的方式。另外在使用context
時有兩點值得注意:上游任務僅僅使用context
通知下游任務不再需要,但不會直接干涉和中斷下游任務的執行,由下游任務自行決定後續的處理操作,也就是說context
的取消操作是無侵入的;context
是執行緒安全的,因為context
本身是不可變的(immutable
),因此可以放心地在多個協程中傳遞使用。