golang 無限制同步隊列(unlimited buffer channel)
阿新 • • 發佈:2018-02-14
直接 支持 turn mov 恢復 done cnblogs int mark
問題
如何支持一個無容量限制的channel
- 取出元素會阻塞到元素存在並且返回
- 放入元素永遠不會阻塞,都會立即返回
方法一:用兩個chan加一個list模擬
在單獨的goroutine處理入隊和出隊,這樣不用給list加鎖。
完整代碼:https://github.com/luweimy/goutil/blob/master/syncq/syncq.go
q := &SyncQueue{ ctx: ctx, cancel: cancel, l: list.New(), max: max, in: make(chan interface{}), out: make(chan interface{}), }
func (q *SyncQueue) dispatch() { for { if q.l.Len() == 0 { // the queue is empty, only enqueue is allowed. select { case v := <-q.in: q.l.PushBack(v) case <-q.ctx.Done(): return } } e := q.l.Front() if q.max > 0 && q.l.Len() >= q.max { // the queue is full, only dequeue is allowed. select { case q.out <- e.Value: q.l.Remove(e) case <-q.ctx.Done(): return } } else { // enqueue and dequeue are allowed. select { case value := <-q.in: q.l.PushBack(value) case q.out <- e.Value: q.l.Remove(e) case <-q.ctx.Done(): return } } } }
但是這種方法速度很慢,跑benchmark只有1234 ns/op
方法二:用sync.Cond通知
這個方法比較簡單,就是利用sync.Cond的通知機制。
出隊時,檢測隊列內有無元素,有就直接返回,沒有則阻塞等待條件變量。
入隊時,觸發條件變量通知一個阻塞的端點恢復運行。
完整代碼:https://github.com/luweimy/goutil/blob/master/syncq2/syncq2.go
func (q *SyncQueue) Enqueue(value interface{}) { call.WithLock(q.cond.L, func() { q.l.PushBack(value) q.cond.Signal() }) } func (q *SyncQueue) Dequeue() interface{} { var v interface{} call.WithLock(q.cond.L, func() { // if queue is empty, wait enqueue for q.l.Len() <= 0 { q.cond.Wait() } v = q.l.Remove(q.l.Front()) }) return v }
這種方法速度比上面的快,跑benchmark241 ns/op
golang 無限制同步隊列(unlimited buffer channel)