1. 程式人生 > >golang 無限制同步隊列(unlimited buffer channel)

golang 無限制同步隊列(unlimited buffer channel)

直接 支持 turn mov 恢復 done cnblogs int mark

問題

如何支持一個無容量限制的channel

  • 取出元素會阻塞到元素存在並且返回
  • 放入元素永遠不會阻塞,都會立即返回

方法一:用兩個chan加一個list模擬

在單獨的goroutine處理入隊和出隊,這樣不用給list加鎖。

完整代碼:https://github.com/luweimy/goutil/blob/master/syncq/syncq.go

q := &SyncQueue{
    ctx:    ctx,
    cancel: cancel,
    l:      list.New(),
    max:    max,
    in:     make(chan interface{}),
    out:    make(chan interface{}),
}
func (q *SyncQueue) dispatch() {
    for {
        if q.l.Len() == 0 {
            // the queue is empty, only enqueue is allowed.
            select {
            case v := <-q.in:
                q.l.PushBack(v)
            case <-q.ctx.Done():
                return
            }
        }
        e := q.l.Front()
        if q.max > 0 && q.l.Len() >= q.max {
            // the queue is full, only dequeue is allowed.
            select {
            case q.out <- e.Value:
                q.l.Remove(e)
            case <-q.ctx.Done():
                return
            }
        } else {
            // enqueue and dequeue are allowed.
            select {
            case value := <-q.in:
                q.l.PushBack(value)
            case q.out <- e.Value:
                q.l.Remove(e)
            case <-q.ctx.Done():
                return
            }
        }
    }
}

但是這種方法速度很慢,跑benchmark只有1234 ns/op

方法二:用sync.Cond通知

這個方法比較簡單,就是利用sync.Cond的通知機制。
出隊時,檢測隊列內有無元素,有就直接返回,沒有則阻塞等待條件變量。
入隊時,觸發條件變量通知一個阻塞的端點恢復運行。

完整代碼:https://github.com/luweimy/goutil/blob/master/syncq2/syncq2.go

func (q *SyncQueue) Enqueue(value interface{}) {
    call.WithLock(q.cond.L, func() {
        q.l.PushBack(value)
        q.cond.Signal()
    })
}

func (q *SyncQueue) Dequeue() interface{} {
    var v interface{}
    call.WithLock(q.cond.L, func() {
        // if queue is empty, wait enqueue
        for q.l.Len() <= 0 {
            q.cond.Wait()
        }
        v = q.l.Remove(q.l.Front())
    })
    return v
}

這種方法速度比上面的快,跑benchmark241 ns/op

golang 無限制同步隊列(unlimited buffer channel)