1. 程式人生 > 其它 >Golang訂閱者模式

Golang訂閱者模式

程式碼

package pubsub

import (
	"sync"
	"time"
)

type (
	subscriber chan interface{}         // 訂閱者管道
	topicFunc  func(v interface{}) bool // 主題過濾器
)

// 釋出者物件
type Publisher struct {
	m           sync.RWMutex             // 讀寫鎖
	buffer      int                      // 訂閱佇列快取大小
	timeout     time.Duration            // 釋出超時時間
	subscribers map[subscriber]topicFunc // 訂閱者資訊
}

// 構建釋出者物件 設定釋出超時時間和快取佇列長度
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
	return &Publisher{
		buffer:      buffer,
		timeout:     publishTimeout,
		subscribers: make(map[subscriber]topicFunc),
	}
}

// 新增訂閱者 訂閱全部主題
func (this *Publisher) Subscribe() chan interface{} {
	return this.SubscribeTopic(nil)
}

// 新增新訂閱者 訂閱過濾器篩選後的主題
func (this *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
	ch := make(chan interface{}, this.buffer)
	this.m.Lock()
	this.subscribers[ch] = topic
	this.m.Unlock()
	return ch
}

// 退出訂閱
func (this *Publisher) Evict(sub chan interface{}) {
	this.m.Lock()
	defer this.m.Unlock()

	delete(this.subscribers, sub)
	close(sub)
}

// 釋出一個主題
func (this *Publisher) Publish(v interface{}) {
	this.m.RLock()
	defer this.m.RUnlock()

	var wg sync.WaitGroup
	for sub, topic := range this.subscribers {
		wg.Add(1)
		go this.sendTopic(sub, topic, v, &wg)
	}
	wg.Wait()
}

// 傳送主題(可以容忍一定超時)
func (this *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
	defer wg.Done()
	if topic != nil && !topic(v) {
		return
	}

	select {
	case sub <- v:
	case <-time.After(this.timeout):
	}
}

// 關閉釋出者物件 同時關閉所有訂閱者管道
func (this *Publisher) Close() {
	this.m.Lock()
	defer this.m.Unlock()

	for sub := range this.subscribers {
		delete(this.subscribers, sub)
		close(sub)
	}
}