1. 程式人生 > >NSQ訊息傳送機制

NSQ訊息傳送機制

NSQ是Go語言編寫的,開源的分散式訊息佇列中介軟體,其設計的目的是用來大規模地處理每天數以十億計級別的訊息。NSQ 具有分散式和去中心化拓撲結構,該結構具有無單點故障、故障容錯、高可用性以及能夠保證訊息的可靠傳遞的特徵,是一個成熟的、已在大規模生成環境下應用的產品。

nsq元件

nsqd

單個nsqd例項可以用來樹立多個數據流。

topic

資料流也叫topic, 一個topic可以有多個channel。

channel

每一個channel都接收該topic下面的所有訊息,然後傳送給消費者。channel會均勻的傳送訊息給消費者,不重複消費。

Topics and channels 都沒有優先順序,topic在兩種情況下會被使用到,在傳送資料到一該topic時候首次使用,或者有消費者訂閱該topic下的某個channel時候。channel在有消費者訂閱了該channle的時候使用。


這裡寫圖片描述

nsqlookupd

這是一個輔助應用,啟動目錄服務儲存著nsqd的地址,consumer可以通過nsqlookupd查詢到它消費的channel、topic對應的nsqd服務。consumer不需要了解producer的資訊。

nsqadmin

提供了一個webUI,我們可以查詢到topics/channels/consumers這些資訊,便於監管。

nsq訊息傳遞確保

nsq可以確保傳送過來的訊息至少被一個消費者消費;為了確保訊息消費,重複消費是不可避免的。nsq通過以下來確保訊息消費

  1. 客戶端表示它們可以接收訊息
  2. NSQ傳送訊息,暫時將訊息存在本地
  3. 客戶端回覆FIN 或者 REQ 表示成功或者重發。如果客戶端未能及時傳送,則NSQ將重複傳送訊息給該客戶端。

nsq的安裝使用(mac)

安裝: brew install nsq
使用(命令列):

啟動

在一個shell中:nsqlookupd
另一個shell中:nsqd –lookupd-tcp-address=127.0.0.1:4160
第三個shell中:nsqadmin –lookupd-http-address=127.0.0.1:4161

傳送接收

curl -d ‘hello world 1’ ‘http://127.0.0.1:4151/pub?topic=test’ //傳送訊息
nsq_to_file –topic=test –output-dir=/tmp –lookupd-http-address=127.0.0.1:4161

這裡寫圖片描述

值得注意的是:nsq_to_file並不會精確的告訴topic產生的位置,它從nsqlookupd獲取資訊,不會丟掉訊息。

nsq go生產者和消費者

go get github.com/nsqio/go-nsq

傳送端

package main

import (
        "github.com/nsqio/go-nsq"
       )

var producer *nsq.Producer

func main() {
    nsqd := "127.0.0.1:4150"
    producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
    producer.Publish("test", []byte("nihao"))  //topic, message
    if err != nil {
        panic(err)
    }
}

消費端

package main

import (
 "fmt"
 "sync"
 "github.com/nsqio/go-nsq"
)
type NSQHandler struct {
}

func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
    fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
    return nil
}

func testNSQ() {
    waiter := sync.WaitGroup{}
    waiter.Add(1)

    go func() {
        defer waiter.Done()
        config:=nsq.NewConfig()
        config.MaxInFlight=9

    //建立多個連線
    for i := 0; i<5; i++ {
        consumer, err := nsq.NewConsumer("test", "zj", config)  //topic, channel, config
        if nil != err {
            fmt.Println("err", err)
            return
        }

        consumer.AddHandler(&NSQHandler{})
        err = consumer.ConnectToNSQD("127.0.0.1:4150")
        if nil != err {
            fmt.Println("err", err)
            return
        }
    }
        select{}

    }()

    waiter.Wait()
}
func main() {
        testNSQ();

}

這裡寫圖片描述

nsq效能介紹

Kafka自身服務與訊息的生產和消費都依賴與Zookeeper,使用Scala語言開發。因為其訊息的消費使用客戶端Pull方式,訊息可以被多個客戶端消費,理論上訊息會重複,但是不會丟失(除非訊息過期)。因此比較常用的場景是作為日誌傳輸的訊息平臺。

nsq預設是不儲存訊息,訊息傳遞時候通常是無序的,可以保留資訊去check時間戳,nsq更適合處理資料量大但是彼此間沒有順序關係的訊息。

這裡寫圖片描述

參考文獻

附屬學習資料(go)

Go 內建關鍵字 (25個均為小寫)

break,default,func,interface,select,case,defer,go,map,struct,chan,else,goto,package,switch,const,fall,through,if,range,type,continue,for,import,return,var

Go 的註釋方法

// : 單行註釋
/**/ : 多行註釋

Go 程式的一般結構

通過package來組織結構
只有package名稱為main的包可以包含main函式
一個可執行程式有且只有一個main函式
通過import關鍵字來匯入其他非main包
通過const關鍵字來進行常量的定義
通過在函式體外部使用var關鍵字來進行全域性變數的宣告與賦值
通過type關鍵字來進行結構(struct)或(interface)的宣告
通過func關鍵字來進行函式的宣告

匯入package的方式和注意事項

逐個匯入:

import "fmt"
import "os"
import "time"

統一匯入

import (
"fmt"
"os"
"time"
)

匯入包之後,就可以使用格式. 來對包中的函式進行呼叫
注意: 如果匯入了包未被呼叫,則其中的函式或型別將會報出編譯錯誤

可見性規則

在Go語言中通過大小寫來決定該變數、常量、型別、介面、結構或函式是否可被外部所呼叫
根據約定,函式名首字母小寫即為private, 大寫即為public

package 別名的使用

當匯入第三方包時,包名很可能非常接近或相同,需要使用別名來進行區別和呼叫如:

import std "fmt"
func main() {
    std.Println("HelloWorld!");
}

省略呼叫如:

import . "fmt"
func main() {
    Println("HelloWorld!");
}

省略呼叫的注意事項:不可與別名同時使用且不建議在實際專案中採用