NSQ訊息傳送機制
NSQ是Go語言編寫的,開源的分散式訊息佇列中介軟體,其設計的目的是用來大規模地處理每天數以十億計級別的訊息。NSQ 具有分散式和去中心化拓撲結構,該結構具有無單點故障、故障容錯、高可用性以及能夠保證訊息的可靠傳遞的特徵,是一個成熟的、已在大規模生成環境下應用的產品。
nsq元件
nsqd
單個nsqd例項可以用來樹立多個數據流。
topic
資料流也叫topic, 一個topic可以有多個channel。
channel
每一個channel都接收該topic下面的所有訊息,然後傳送給消費者。channel會均勻的傳送訊息給消費者,不重複消費。
Topics and channels 都沒有優先順序,topic在兩種情況下會被使用到,在傳送資料到一該topic時候首次使用,或者有消費者訂閱該topic下的某個channel時候。channel在有消費者訂閱了該channle的時候使用。
nsqlookupd
這是一個輔助應用,啟動目錄服務儲存著nsqd的地址,consumer可以通過nsqlookupd查詢到它消費的channel、topic對應的nsqd服務。consumer不需要了解producer的資訊。
nsqadmin
提供了一個webUI,我們可以查詢到topics/channels/consumers這些資訊,便於監管。
nsq訊息傳遞確保
nsq可以確保傳送過來的訊息至少被一個消費者消費;為了確保訊息消費,重複消費是不可避免的。nsq通過以下來確保訊息消費
- 客戶端表示它們可以接收訊息
- NSQ傳送訊息,暫時將訊息存在本地
- 客戶端回覆FIN 或者 REQ 表示成功或者重發。如果客戶端未能及時傳送,則NSQ將重複傳送訊息給該客戶端。
nsq的安裝使用(mac)
安裝: brew install nsq
使用(命令列):
啟動
在一個shell中:nsqlookupd
另一個shell中:nsqd –lookupd-tcp-address=127.0.0.1:4160
第三個shell中:nsqadmin –lookupd-http-address=127.0.0.1:4161
傳送接收
curl -d ‘hello world 1’ ‘http://127.0.0.1:4151/pub?topic=test’ //傳送訊息
nsq_to_file –topic=test –output-dir=/tmp –lookupd-http-address=127.0.0.1:4161
值得注意的是:nsq_to_file並不會精確的告訴topic產生的位置,它從nsqlookupd獲取資訊,不會丟掉訊息。
nsq go生產者和消費者
go get github.com/nsqio/go-nsq
傳送端
package main
import (
"github.com/nsqio/go-nsq"
)
var producer *nsq.Producer
func main() {
nsqd := "127.0.0.1:4150"
producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
producer.Publish("test", []byte("nihao")) //topic, message
if err != nil {
panic(err)
}
}
消費端
package main
import (
"fmt"
"sync"
"github.com/nsqio/go-nsq"
)
type NSQHandler struct {
}
func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
func testNSQ() {
waiter := sync.WaitGroup{}
waiter.Add(1)
go func() {
defer waiter.Done()
config:=nsq.NewConfig()
config.MaxInFlight=9
//建立多個連線
for i := 0; i<5; i++ {
consumer, err := nsq.NewConsumer("test", "zj", config) //topic, channel, config
if nil != err {
fmt.Println("err", err)
return
}
consumer.AddHandler(&NSQHandler{})
err = consumer.ConnectToNSQD("127.0.0.1:4150")
if nil != err {
fmt.Println("err", err)
return
}
}
select{}
}()
waiter.Wait()
}
func main() {
testNSQ();
}
nsq效能介紹
Kafka自身服務與訊息的生產和消費都依賴與Zookeeper,使用Scala語言開發。因為其訊息的消費使用客戶端Pull方式,訊息可以被多個客戶端消費,理論上訊息會重複,但是不會丟失(除非訊息過期)。因此比較常用的場景是作為日誌傳輸的訊息平臺。
nsq預設是不儲存訊息,訊息傳遞時候通常是無序的,可以保留資訊去check時間戳,nsq更適合處理資料量大但是彼此間沒有順序關係的訊息。
參考文獻
附屬學習資料(go)
Go 內建關鍵字 (25個均為小寫)
break,default,func,interface,select,case,defer,go,map,struct,chan,else,goto,package,switch,const,fall,through,if,range,type,continue,for,import,return,var
Go 的註釋方法
// : 單行註釋
/**/ : 多行註釋
Go 程式的一般結構
通過package來組織結構
只有package名稱為main的包可以包含main函式
一個可執行程式有且只有一個main函式
通過import關鍵字來匯入其他非main包
通過const關鍵字來進行常量的定義
通過在函式體外部使用var關鍵字來進行全域性變數的宣告與賦值
通過type關鍵字來進行結構(struct)或(interface)的宣告
通過func關鍵字來進行函式的宣告
匯入package的方式和注意事項
逐個匯入:
import "fmt"
import "os"
import "time"
統一匯入
import (
"fmt"
"os"
"time"
)
匯入包之後,就可以使用格式. 來對包中的函式進行呼叫
注意: 如果匯入了包未被呼叫,則其中的函式或型別將會報出編譯錯誤
可見性規則
在Go語言中通過大小寫來決定該變數、常量、型別、介面、結構或函式是否可被外部所呼叫
根據約定,函式名首字母小寫即為private, 大寫即為public
package 別名的使用
當匯入第三方包時,包名很可能非常接近或相同,需要使用別名來進行區別和呼叫如:
import std "fmt"
func main() {
std.Println("HelloWorld!");
}
省略呼叫如:
import . "fmt"
func main() {
Println("HelloWorld!");
}
省略呼叫的注意事項:不可與別名同時使用且不建議在實際專案中採用