1. 程式人生 > >nsq 快速入門經驗分享

nsq 快速入門經驗分享

nsq 是什麼東西,這裡就不長篇大論啦

我在 Mac 上嘗試 nsq.io 中提供的 QUICK START 遇到問題,這裡作簡要說明

當執行到第 6 步時,nsq_to_file 報如下錯誤:
error connecting to nsqd - dial tcp: i/o timeout
同時當你訪問 http://127.0.0.1:4171/ 點選某些選單的時候也會報錯

對於第一次部署的我,這種問題猶如晴天霹靂,因為那可是官方提供的教程啊

還好這種問題經不起推敲,大意就是連不上 nsqd 唄,但是為什麼會連不上呢?我在 nsqd 命令列引數裡找到了答案
-broadcast-address="": address that will be registered with lookupd (defaults to the OS hostname)


在 Mac 上應該顯式指定
nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1
我想正如上面說的那樣,預設是作業系統主機名,這樣 nsqd 雖然能繫結成功,但是客戶端就是無法識別

關於 nsq 我還發現一個小小細節
nsqd 提供的 HTTP API /pub publish a message to a topic(釋出訊息到話題)
但真正在實際操作的時候卻是這樣的,兩種方式都可以?

curl -d "<message>" http://127.0.0.1:4151/put?topic=name
curl -d "<message>" http://127.0.0.1:4151/pub?topic=name

關於 nsq 入門我也僅有一天多的功力,分享不了太多有價值的東西,但是通過文件的閱讀和實踐,這裡我作一下總結:
官方文件(其實就是上面的 QUICK START)
中文翻譯(拜讀部分翻譯,表示感謝)
go-nsq Golang客戶端庫(官方客戶端開發庫)

這裡給出我用客戶端開發庫寫的測試程式碼

package main

import (
    "fmt"
    "time"

    "github.com/nsqio/go-nsq"
)

// ConsumerHandler 消費者處理者
type ConsumerHandler struct{} // HandleMessage 處理訊息 func (*ConsumerHandler) HandleMessage(msg *nsq.Message) error { fmt.Println(string(msg.Body)) return nil } // Producer 生產者 func Producer() { producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig()) if err != nil { fmt.Println("NewProducer", err) panic(err) } i := 1 for { if err := producer.Publish("test", []byte(fmt.Sprintf("Hello World %d", i))); err != nil { fmt.Println("Publish", err) panic(err) } time.Sleep(time.Second * 5) i++ } } // ConsumerA 消費者 func ConsumerA() { consumer, err := nsq.NewConsumer("test", "test-channel-a", nsq.NewConfig()) if err != nil { fmt.Println("NewConsumer", err) panic(err) } consumer.AddHandler(&ConsumerHandler{}) if err := consumer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil { fmt.Println("ConnectToNSQLookupd", err) panic(err) } } // ConsumerB 消費者 func ConsumerB() { consumer, err := nsq.NewConsumer("test", "test-channel-b", nsq.NewConfig()) if err != nil { fmt.Println("NewConsumer", err) panic(err) } consumer.AddHandler(&ConsumerHandler{}) if err := consumer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil { fmt.Println("ConnectToNSQLookupd", err) panic(err) } } func main() { ConsumerA() ConsumerB() Producer() }

命令執行順序如下

nsqlookupd
nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1
nsqadmin --lookupd-http-address=127.0.0.1:4161

測試程式執行列印如下

banjakukutekiiMac:test panshiqu$ ./main 
2016/11/24 09:52:49 INF    1 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:52:49 ERR    1 [test/test-channel-a] error querying nsqlookupd (http://127.0.0.1:4161/lookup?topic=test) - got response 404 Not Found "{\"message\":\"TOPIC_NOT_FOUND\"}"
2016/11/24 09:52:49 INF    2 [test/test-channel-b] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:52:49 ERR    2 [test/test-channel-b] error querying nsqlookupd (http://127.0.0.1:4161/lookup?topic=test) - got response 404 Not Found "{\"message\":\"TOPIC_NOT_FOUND\"}"
2016/11/24 09:52:49 INF    3 (127.0.0.1:4150) connecting to nsqd
2016/11/24 09:53:57 INF    2 [test/test-channel-b] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:53:57 INF    2 [test/test-channel-b] (127.0.0.1:4150) connecting to nsqd
Hello World 1
Hello World 2
Hello World 3
Hello World 4
Hello World 5
Hello World 6
Hello World 7
Hello World 8
Hello World 9
Hello World 10
Hello World 11
Hello World 12
Hello World 13
Hello World 14
Hello World 15
2016/11/24 09:54:01 INF    1 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 09:54:01 INF    1 [test/test-channel-a] (127.0.0.1:4150) connecting to nsqd
Hello World 16
Hello World 16
Hello World 17
Hello World 17
Hello World 18
Hello World 18
...

對於輸出我作如下理解,因為初次啟動 nsq 相關程式,ConsumerA[test/test-channel-a] 查詢 nsqlookupd 主題為 test,返回錯誤,主題不存在。ConsumerB[test/test-channel-b] 也執行上面的動作。這個時候應該不會建立兩個 channel,test-channel-a 和 test-channel-b,也不會建立主題。接下來 Producer 成功連線 nsqd,這個時候會建立 test 主題。等待了一會後 ConsumerB 嘗試查詢主題成功,進而連線 nsqd,成功建立 test-channel-b,消費已被生產出的 15 條訊息,因為 test-channel-a 還未被建立,所以目前已有的訊息是不會被複制分發的。接著 ConsumerA 嘗試查詢主題成功,進而連線 nsqd,成功建立 test-channel-a,接下來的訊息都是被複制分發的,兩個消費者都能收到

兩個 channel 都指定為 test-channel-a 將得到如下輸出,可以確定的是多個消費者守在同一個 channel 中,同一條訊息將只會被一個消費者處理

banjakukutekiiMac:test panshiqu$ go run main.go
2016/11/24 10:23:52 INF    1 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 10:23:52 INF    1 [test/test-channel-a] (127.0.0.1:4150) connecting to nsqd
2016/11/24 10:23:52 INF    2 [test/test-channel-a] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2016/11/24 10:23:52 INF    2 [test/test-channel-a] (127.0.0.1:4150) connecting to nsqd
2016/11/24 10:23:52 INF    3 (127.0.0.1:4150) connecting to nsqd
Hello World 1
Hello World 2
Hello World 3

提醒大家在執行上面流程的時候多去 http://127.0.0.1:4171/ 檢視執行狀態,將會在那裡發現很多內部細節
還有很多情況建議大家也嘗試嘗試,多 nsqd 和 多 nsqlookupd,畢竟人家是支援叢集的

====本文內容已完,宣傳我的專案====
歡迎關注《休閒益智遊戲》微信服務號
謝謝