Golang使用NSQ示例
阿新 • • 發佈:2018-11-02
一、下載
安裝包下載地址:https://nsq.io/deployment/installing.html
nsq原始碼地址:https://github.com/nsqio/nsq
官方提供的golang版本客戶端:https://github.com/nsqio/go-nsq
二、執行NSQ
- 首先啟動nsqlookud
nsqlookupd
- 啟動nsqd,並接入剛剛啟動的nsqlookud。這裡為了方便接下來的測試,啟動了兩個nsqd
nsqd --lookupd-tcp-address=127.0.0.1:4160
nsqd --lookupd-tcp-address=127.0.0.1:4160 --tcp-address=0.0.0.0:4152 --http-address=0.0.0.0:4153
- 啟動nqsadmin
nsqadmin --lookupd-http-address=127.0.0.1:4161
三、示例
使用官方提供的golang客戶端:
go get github.com/nsqio/go-nsq
1.生成者
ps:開啟兩個生產者producer1和producer2,由這兩個生產者交替生產訊息
package main import ( "fmt" "log" "bufio" "os" "github.com/nsqio/go-nsq" ) func main() { strIP1 := "127.0.0.1:4150" strIP2 := "127.0.0.1:4152" producer1,err := initProducer(strIP1) if err != nil { log.Fatal("init producer1 error:",err) } producer2,err := initProducer(strIP2) if err != nil { log.Fatal("init producer2 error:",err) } defer producer1.Stop() defer producer2.Stop() //讀取控制檯輸入 reader := bufio.NewReader(os.Stdin) count := 0 for { fmt.Print("please say:") data, _, _ := reader.ReadLine() command := string(data) if command == "stop" { fmt.Println("stop producer!") return } if count % 2 == 0 { err := producer1.public("test1",command) if err != nil { log.Fatal("producer1 public error:",err) } }else { err := producer2.public("test2",command) if err != nil { log.Fatal("producer2 public error:",err) } } count++ } } type nsqProducer struct { *nsq.Producer } //初始化生產者 func initProducer(addr string) (*nsqProducer, error) { fmt.Println("init producer address:",addr) producer,err := nsq.NewProducer(addr,nsq.NewConfig()) if err != nil { return nil,err } return &nsqProducer{producer},nil } //釋出訊息 func (np *nsqProducer)public(topic,message string) error { err := np.Publish(topic,[]byte(message)) if err != nil { log.Println("nsq public error:",err) return err } return nil }
2.消費者
ps:初始化兩個消費者消費test1和test2
package main
import (
"time"
"fmt"
"log"
"github.com/nsqio/go-nsq"
)
func main() {
err := initConsumer("test1", "test-channel1", "127.0.0.1:4161")
if err != nil {
log.Fatal("init Consumer error")
}
err = initConsumer("test2","test-channel2","127.0.0.1:4161")
if err != nil {
log.Fatal("init Consumer error")
}
select {
}
}
type nsqHandler struct {
nsqConsumer *nsq.Consumer
messagesReceived int
}
//處理訊息
func (nh *nsqHandler)HandleMessage(msg *nsq.Message) error{
nh.messagesReceived++
fmt.Printf("receive ID:%s,addr:%s,message:%s",msg.ID, msg.NSQDAddress, string(msg.Body))
fmt.Println()
return nil
}
func initConsumer(topic, channel, addr string) error {
cfg := nsq.NewConfig()
cfg.LookupdPollInterval = 3*time.Second
c,err := nsq.NewConsumer(topic,channel,cfg)
if err != nil {
log.Println("init Consumer NewConsumer error:",err)
return err
}
handler := &nsqHandler{nsqConsumer:c}
c.AddHandler(handler)
err = c.ConnectToNSQLookupd(addr)
if err != nil {
log.Println("init Consumer ConnectToNSQLookupd error:",err)
return err
}
return nil
}
結果:
生產者:
消費者:
也可以通過nsqadmin http://localhost:4171/ 檢視nsq相關資訊