1. 程式人生 > >Golang使用NSQ示例

Golang使用NSQ示例

一、下載

  安裝包下載地址:https://nsq.io/deployment/installing.html

  nsq原始碼地址:https://github.com/nsqio/nsq

 官方提供的golang版本客戶端:https://github.com/nsqio/go-nsq

 

二、執行NSQ

  •  首先啟動nsqlookud
nsqlookupd
  • 啟動nsqd,並接入剛剛啟動的nsqlookud。這裡為了方便接下來的測試,啟動了兩個nsqd
nsqd --lookupd-tcp-address=127.0.0.1:4160
nsqd --lookupd-tcp-address=127.0.0.1:4160 --tcp-address=0.0.0.0:4152 --http-address=0.0.0.0:4153
  • 啟動nqsadmin
nsqadmin --lookupd-http-address=127.0.0.1:4161

 

三、示例

    使用官方提供的golang客戶端:

go get github.com/nsqio/go-nsq

  

1.生成者

      ps:開啟兩個生產者producer1和producer2,由這兩個生產者交替生產訊息

package main

import (
	"fmt"
	"log"
	"bufio"
	"os"

	"github.com/nsqio/go-nsq"
)


func main() {
	strIP1 := "127.0.0.1:4150"
	strIP2 := "127.0.0.1:4152"

	producer1,err := initProducer(strIP1)
	if err != nil {
		log.Fatal("init producer1 error:",err)
	}
	producer2,err := initProducer(strIP2)
	if err != nil {
		log.Fatal("init producer2 error:",err)
	}

	defer producer1.Stop()
	defer producer2.Stop()

	//讀取控制檯輸入
	reader := bufio.NewReader(os.Stdin)

	count := 0
	for {
		fmt.Print("please say:")
		data, _, _ := reader.ReadLine()
		command := string(data)
		if command == "stop" {
			fmt.Println("stop producer!")
			return
		}
		if count % 2 == 0 {
			err := producer1.public("test1",command)
			if err != nil {
				log.Fatal("producer1 public error:",err)
			}
		}else {
			err := producer2.public("test2",command)
			if err != nil {
				log.Fatal("producer2 public error:",err)
			}
		}

		count++
	}
}

type nsqProducer struct {
	 *nsq.Producer
}


//初始化生產者
func initProducer(addr string) (*nsqProducer, error) {
	fmt.Println("init producer address:",addr)
	producer,err := nsq.NewProducer(addr,nsq.NewConfig())
	if err != nil {
		return nil,err
	}
	return &nsqProducer{producer},nil
}

//釋出訊息
func (np *nsqProducer)public(topic,message string) error {
	err := np.Publish(topic,[]byte(message))
	if err != nil {
		log.Println("nsq public error:",err)
		return err
	}
	return nil
}

 

 2.消費者

    ps:初始化兩個消費者消費test1和test2

package main

import (
	"time"
	"fmt"
	"log"
	
	"github.com/nsqio/go-nsq"
)

func main() {
	err := initConsumer("test1", "test-channel1", "127.0.0.1:4161")
	if err != nil {
		log.Fatal("init Consumer error")
	}
	err = initConsumer("test2","test-channel2","127.0.0.1:4161")
	if err != nil {
		log.Fatal("init Consumer error")
	}
	select {

	}
}


type nsqHandler struct {
	nsqConsumer      *nsq.Consumer
	messagesReceived int
}

//處理訊息
func (nh *nsqHandler)HandleMessage(msg *nsq.Message) error{
	nh.messagesReceived++
	fmt.Printf("receive ID:%s,addr:%s,message:%s",msg.ID, msg.NSQDAddress, string(msg.Body))
	fmt.Println()
	return nil
}

func initConsumer(topic, channel, addr string) error {
	cfg := nsq.NewConfig()
	cfg.LookupdPollInterval = 3*time.Second
	c,err := nsq.NewConsumer(topic,channel,cfg)
	if err != nil {
		log.Println("init Consumer NewConsumer error:",err)
		return err
	}

	handler := &nsqHandler{nsqConsumer:c}
	c.AddHandler(handler)

	err = c.ConnectToNSQLookupd(addr)
	if err != nil {
		log.Println("init Consumer ConnectToNSQLookupd error:",err)
		return err
	}
	return nil
}

 

結果:

生產者:

 

消費者:

 

也可以通過nsqadmin   http://localhost:4171/   檢視nsq相關資訊