Golang訂閱者模式
阿新 • • 發佈:2021-12-16
程式碼
package pubsub import ( "sync" "time" ) type ( subscriber chan interface{} // 訂閱者管道 topicFunc func(v interface{}) bool // 主題過濾器 ) // 釋出者物件 type Publisher struct { m sync.RWMutex // 讀寫鎖 buffer int // 訂閱佇列快取大小 timeout time.Duration // 釋出超時時間 subscribers map[subscriber]topicFunc // 訂閱者資訊 } // 構建釋出者物件 設定釋出超時時間和快取佇列長度 func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { return &Publisher{ buffer: buffer, timeout: publishTimeout, subscribers: make(map[subscriber]topicFunc), } } // 新增訂閱者 訂閱全部主題 func (this *Publisher) Subscribe() chan interface{} { return this.SubscribeTopic(nil) } // 新增新訂閱者 訂閱過濾器篩選後的主題 func (this *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { ch := make(chan interface{}, this.buffer) this.m.Lock() this.subscribers[ch] = topic this.m.Unlock() return ch } // 退出訂閱 func (this *Publisher) Evict(sub chan interface{}) { this.m.Lock() defer this.m.Unlock() delete(this.subscribers, sub) close(sub) } // 釋出一個主題 func (this *Publisher) Publish(v interface{}) { this.m.RLock() defer this.m.RUnlock() var wg sync.WaitGroup for sub, topic := range this.subscribers { wg.Add(1) go this.sendTopic(sub, topic, v, &wg) } wg.Wait() } // 傳送主題(可以容忍一定超時) func (this *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) { defer wg.Done() if topic != nil && !topic(v) { return } select { case sub <- v: case <-time.After(this.timeout): } } // 關閉釋出者物件 同時關閉所有訂閱者管道 func (this *Publisher) Close() { this.m.Lock() defer this.m.Unlock() for sub := range this.subscribers { delete(this.subscribers, sub) close(sub) } }