EasyCVR開發中NSQ延時推流功能的測試和實現
EasyCVR 是TSINGSEE青犀視訊開發的高穩定、高接入性的視訊平臺,可接入的協議豐富,且可通過國標協議級聯。EasyCVR 各模組之間進行訊息通訊時,需要一款訊息中介軟體進行訊息的傳輸和傳送。在調研各種 MQ 中介軟體後,確定採用 NSQ 來進行編譯。
EasyCVR 使用 NSQ 時,希望延時 60s,消費端才能夠收到對應的訊息,因此我們本文主要是調研是否有該功能的過程,我們主要使用 DeferredPublish 方法實現,方法程式碼如下:
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"log"
"time"
"zhangqiadams.com/gotools/model/consts"
func main() {
config := nsq.NewConfig()
// 1. 向 nsqd 的 tcp 埠傳送訊息,因此進行對應的配置
producer, err := nsq.NewProducer("127.0.0.1:4154", config)
if err != nil {
log.Fatal(err)
}
messageBody := []byte("hello world delay")
topicName := "topic2"
// 2. 同步推流到 nspd, 同步推流代表等待 nspd 的響應,如果傳送失敗返回錯誤。
// PublishAsync 代表是非同步推送訊息給 nspd,傳送完訊息後立刻返回
err = producer.DeferredPublish(topicName, 60*time.Second, messageBody)
fmt.Println("傳送訊息時間為", time.Now().Format(consts.TimeFormat))
if err != nil {
log.Fatal(err)
}
/*sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan*/
// 3. 停止生產者,一般在停止服務,停止程序的時候需要呼叫
producer.Stop()
}
在 14:06:45 開始傳送了一個訊息。
消費者在 60s 後收到訊息,14:07:46 收到對應的訊息。
經過程式碼確認,延時訊息的傳送是在 nsqd 中進行實現的,延時推流功能已經實現。