1. 程式人生 > 其它 >EasyCVR開發中NSQ延時推流功能的測試和實現

EasyCVR開發中NSQ延時推流功能的測試和實現

EasyCVR 是TSINGSEE青犀視訊開發的高穩定、高接入性的視訊平臺,可接入的協議豐富,且可通過國標協議級聯。EasyCVR 各模組之間進行訊息通訊時,需要一款訊息中介軟體進行訊息的傳輸和傳送。在調研各種 MQ 中介軟體後,確定採用 NSQ 來進行編譯。

EasyCVR 使用 NSQ 時,希望延時 60s,消費端才能夠收到對應的訊息,因此我們本文主要是調研是否有該功能的過程,我們主要使用 DeferredPublish 方法實現,方法程式碼如下:

package main

import (

"fmt"

"github.com/nsqio/go-nsq"

"log"

"time"

"zhangqiadams.com/gotools/model/consts"

func main() {

config := nsq.NewConfig()

// 1. 向 nsqd 的 tcp 埠傳送訊息,因此進行對應的配置

producer, err := nsq.NewProducer("127.0.0.1:4154", config)

if err != nil {

log.Fatal(err)

}

messageBody := []byte("hello world delay")

topicName := "topic2"

// 2. 同步推流到 nspd, 同步推流代表等待 nspd 的響應,如果傳送失敗返回錯誤。

// PublishAsync 代表是非同步推送訊息給 nspd,傳送完訊息後立刻返回

err = producer.DeferredPublish(topicName, 60*time.Second, messageBody)

fmt.Println("傳送訊息時間為", time.Now().Format(consts.TimeFormat))

if err != nil {

log.Fatal(err)

}

/*sigChan := make(chan os.Signal, 1)

signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

<-sigChan*/

// 3. 停止生產者,一般在停止服務,停止程序的時候需要呼叫

producer.Stop()

}

在 14:06:45 開始傳送了一個訊息。

消費者在 60s 後收到訊息,14:07:46 收到對應的訊息。

經過程式碼確認,延時訊息的傳送是在 nsqd 中進行實現的,延時推流功能已經實現。