NSQ原始碼分析(二)—— Topic
Topic是NSQ非常重要的概念,本次主要講述Topic的獲取、新建、Topic中訊息的輪詢、Topic中訊息的來源、Topic的刪除和退出以及Topic的暫停和取消暫停
topic的相關操作主要在nsq/nsqd/topic.go中
首先看下Topic結構體
type Topic struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms messageCount uint64 //訊息數量 sync.RWMutex name string //topic的名稱 channelMap map[string]*Channel //該topic關聯的Channel資訊 backend BackendQueue //用於接收訊息,訊息存放到磁碟 memoryMsgChan chan *Message //用於接收訊息,訊息存放到記憶體 startChan chan int //用於阻塞messagePump,直到收到startChan訊號 exitChan chan int //退出的通道 channelUpdateChan chan int //channel有變更時傳送通知的通道 waitGroup util.WaitGroupWrapper exitFlag int32 //退出 idFactory *guidFactory ephemeral bool //是否是臨時的topic deleteCallback func(*Topic) //執行刪除的函式 deleter sync.Once //只執行一次 paused int32 //是否暫停 1 暫停,0不暫停 pauseChan chan int //傳送暫停或取消暫停的訊息 ctx *context //上下文,儲存nsqd指標 }
一、Topic的獲取
GetTopic以協程安全執行並返回指向Topic物件的指標(可能是新建立的)
1.從nsqd例項的topicMap中獲取,如果有則返回
2.如果topicMap中沒有對應的topic資訊,則新建topic(呼叫NewTopic)
3.新建topic後,從nsqlookupd獲取對應的channel名稱
4.如果channel名稱不存在topic的channelMap中,則新建channel
func (n *NSQD) GetTopic(topicName string) *Topic { // most likely, we already have this topic, so try read lock first. n.RLock() //從topicMap中讀取,如果有,則返回 t, ok := n.topicMap[topicName] n.RUnlock() if ok { return t } //這次用通用鎖來處理 n.Lock() t, ok = n.topicMap[topicName] if ok { n.Unlock() return t } //topicMap中沒有,則是第一次使用的topic,需要新建 //定義刪除函式 deleteCallback := func(t *Topic) { n.DeleteExistingTopic(t.name) } //建立topic t = NewTopic(topicName, &context{n}, deleteCallback) n.topicMap[topicName] = t n.Unlock() n.logf(LOG_INFO, "TOPIC(%s): created", t.name) // topic is created but messagePump not yet started // if loading metadata at startup, no lookupd connections yet, topic started after load //如果啟動時正在載入元資料,沒有連線nsqlookupd,topic在載入完元資料後啟動 if atomic.LoadInt32(&n.isLoading) == 1 { return t } // if using lookupd, make a blocking call to get the topics, and immediately create them. // this makes sure that any message received is buffered to the right channels //獲取nsqd所配置的lookupd的http地址 lookupdHTTPAddrs := n.lookupdHTTPAddrs() if len(lookupdHTTPAddrs) > 0 { //根據topic名稱,從nsqlookupd獲取所有關聯的channel名稱 channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs) if err != nil { n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err) } for _, channelName := range channelNames { if strings.HasSuffix(channelName, "#ephemeral") {//臨時的 continue // do not create ephemeral channel with no consumer client } t.GetChannel(channelName) } } else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 { n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name) } // now that all channels are added, start topic messagePump t.Start() return t }
ps:提出兩個問題可以思考一下
什麼時候會呼叫GetTopic函式?
1.在nsqd啟動的時候呼叫LoadMetadata() 載入元資料時會呼叫GetTopic用於初始化元資料中的Topic和Channel
2.當生產者PUB訊息的時候,會指定topic,這時也會呼叫GetTopic
為什麼topic對應的channel名稱要從nsqlookupd中獲取?
可以參考:https://blog.csdn.net/skh2015java/article/details/82747450
當消費者消費訊息時會指定topic及對應的channel,所以nsqlookupd中維護者topic和channel的對應關係(一對多)
二、Topic的新建
topic分為臨時topic和永久topic,臨時topic backend變數使用newDummyBackendQueue函式初始化。該函式生成一個無任何功能的dummyBackendQueue結構;
對於永久的topic,backend使用newDiskQueue函式返回diskQueue型別賦值,並開啟新的goroutine來進行資料的持久化。
1.初始化Topic結構體
2.對於非臨時topic,則初始化topic的backend為diskQueue,diskQueue是記錄在磁碟檔案中的FIFO佇列(當記憶體佇列滿的時候會用到該磁碟佇列)
3.開啟協程呼叫messagePump函式,messagePump的作用是將受到的佇列(記憶體佇列topic中的memoryMsgChan和磁碟佇列disQueue)中的訊息投遞到topic關聯的所有channel中
4.通知nsqd新建了topic
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
t := &Topic{
name: topicName,
channelMap: make(map[string]*Channel),
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize), //記憶體訊息佇列,有快取,MemQueueSize預設是10000
startChan: make(chan int, 1),
exitChan: make(chan int),
channelUpdateChan: make(chan int),
ctx: ctx,
paused: 0,
pauseChan: make(chan int),
deleteCallback: deleteCallback,
idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID),
}
if strings.HasSuffix(topicName, "#ephemeral") { //如果topic的名稱以#ephemeral開頭,如果是則是臨時topic
t.ephemeral = true
t.backend = newDummyBackendQueue()
} else {
dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
opts := ctx.nsqd.getOpts()
lg.Logf(opts.Logger, opts.logLevel, lg.LogLevel(level), f, args...)
}
t.backend = diskqueue.New(
topicName, //topic名稱
ctx.nsqd.getOpts().DataPath, //資料路徑
ctx.nsqd.getOpts().MaxBytesPerFile,//每個檔案的最大位元組數
int32(minValidMsgLength), //每條訊息的最小長度
int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength, //每條訊息的最大長度
ctx.nsqd.getOpts().SyncEvery,
ctx.nsqd.getOpts().SyncTimeout,
dqLogf,
)
}
t.waitGroup.Wrap(t.messagePump)
//通知nsqd新建了Topic
t.ctx.nsqd.Notify(t)
return t
}
messagePump訊息輪詢
messagePump函式主要作用輪詢將記憶體佇列和磁碟佇列中的訊息投遞給該topic關聯的所有Channel,channel的更新,暫停或取消暫停及退出等
1.不在Start()函式呼叫之前接收訊息,即不跳出第一個for迴圈
2.獲取所有該topic對應的Channel
3.當topic對應的Channel的數量大於0,並且該topic不是暫停狀態時初始化memoryMsgChan和backendChan
4.第二個for迴圈中的流程
(1)從記憶體佇列或磁碟檔案中獲取訊息,並投遞給所有關聯的Channel
(2)channel更新
(3)暫停或取消暫停
(4)退出
func (t *Topic) messagePump() {
var msg *Message
var buf []byte
var err error
var chans []*Channel //該topic對應的所有Channel
var memoryMsgChan chan *Message //記憶體訊息佇列
var backendChan chan []byte //backend佇列
// do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
//不在Start()函式呼叫之前接收訊息
for {
select {
case <-t.channelUpdateChan: //channel update的訊息通知
continue
case <-t.pauseChan: //暫停
continue
case <-t.exitChan: //退出
goto exit
case <-t.startChan:
}
break
}
t.RLock()
for _, c := range t.channelMap { //獲取所有該topic對應的Channel
chans = append(chans, c)
}
t.RUnlock()
if len(chans) > 0 && !t.IsPaused() { //如果Channel的長度大於0,並且topic不是暫停狀態
memoryMsgChan = t.memoryMsgChan //獲取記憶體佇列
backendChan = t.backend.ReadChan() //獲取backendChan
}
// main message loop
for {
select {
case msg = <-memoryMsgChan: //從記憶體中獲取
case buf = <-backendChan: //從backendChan中獲取
msg, err = decodeMessage(buf) //需要將buf解碼成msg
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
case <-t.channelUpdateChan: //channel更新
chans = chans[:0] //從新獲取chans
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() { //如果Channel的個數為0或者topic是暫停,則將memoryMsgChan和backendChan置為nil
memoryMsgChan = nil
backendChan = nil
} else { //負責重新指定memoryMsgChan和backendChan
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.pauseChan: //暫停
if len(chans) == 0 || t.IsPaused() { //如果Channel的個數為0或者topic是暫停,則將memoryMsgChan和backendChan置為nil
memoryMsgChan = nil
backendChan = nil
} else { //負責重新指定memoryMsgChan和backendChan
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.exitChan: //退出
goto exit
}
//以下為處理收到的msg
for i, channel := range chans { //遍歷topic的所有的channel
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
//複製訊息,因為每個channel需要唯一的例項
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 { //傳送延時訊息
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
//傳送即時訊息
err := channel.PutMessage(chanMsg)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
}
exit:
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}
三、Topic中訊息的獲取
topic.go檔案中的put函式,是topic中訊息的獲取來源
上面也提到topic中的訊息存在topic中的memoryMsgChan佇列中,該佇列的預設長度是10000,如果該佇列滿了,則會將訊息存到disQueue磁碟檔案中
當生產者PUB訊息時,會根據topic的名稱,將訊息寫入到對應topic的佇列中
1.如果memoryMsgChan佇列沒滿,則將訊息寫入該佇列
2.如果滿了則將訊息寫入到磁碟中
(1)通過bufferPoolGet函式從buffer池中獲取一個buffer,bufferPoolGet及以下bufferPoolPut函式是對sync.Pool的簡單包裝。 兩個函式位於nsqd/buffer_pool.go中。
(2)呼叫writeMessageToBackend函式將訊息寫入磁碟。
(3)通過bufferPoolPut函式將buffer歸還buffer池。
(4)呼叫SetHealth函式將writeMessageToBackend的返回值寫入errValue變數。 該變數衍生出IsHealthy,GetError和GetHealth3個函式,主要用於測試以及從HTTP API獲取nsqd的執行情況(是否發生錯誤)
func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
}
return nil
}
四、Topic的關閉和刪除
1.deleted為true,則通知nsqd
2.close(t.exitChan) 退出memoryMsgChan函式
3.如果deleted為true,則
(1)刪除該topic對應的channel
(2)channel.Delete()清空佇列中的訊息並關閉退出
(3)t.Empty()清空記憶體佇列和磁碟檔案中的訊息
4.如果deleted為false,則
(1)關閉topic對應的channel
(2)呼叫t.flush()將記憶體佇列memoryMsgChan中的訊息寫入到磁碟檔案中
(3)關閉並退出disQueue(檔案中的訊息是不刪除的)
總結來說:
對於刪除操作,需要清空channelMap並刪除所有channel,然後刪除記憶體和磁碟中所有未投遞的訊息。最後關閉backend管理的的磁碟檔案。
對於關閉操作,不清空channelMap,只是關閉所有的channel,使用flush函式將所有memoryMsgChan中未投遞的訊息用writeMessageToBackend儲存到磁碟中。最後關閉backend管理的的磁碟檔案。
// Delete empties the topic and all its channels and closes
func (t *Topic) Delete() error {
return t.exit(true)
}
// Close persists all outstanding topic data and closes all its channels
func (t *Topic) Close() error {
return t.exit(false)
}
func (t *Topic) exit(deleted bool) error {
if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) {
return errors.New("exiting")
}
if deleted {
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name)
// since we are explicitly deleting a topic (not just at system exit time)
// de-register this from the lookupd
t.ctx.nsqd.Notify(t)
} else {
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name)
}
close(t.exitChan)
// synchronize the close of messagePump()
t.waitGroup.Wait()
if deleted {
t.Lock()
for _, channel := range t.channelMap {
delete(t.channelMap, channel.name)
channel.Delete()
}
t.Unlock()
// empty the queue (deletes the backend files, too)
t.Empty()
return t.backend.Delete()
}
// close all the channels
for _, channel := range t.channelMap {
err := channel.Close()
if err != nil {
// we need to continue regardless of error to close all the channels
t.ctx.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", channel.name, err)
}
}
// write anything leftover to disk
t.flush()
return t.backend.Close()
}
五、Topic的暫停和取消暫停
topic的暫停和取消暫停主要是通過原子操作topic中的paused欄位來實現的,paused的值為1則是暫停,0是非暫停狀態
程式碼比較簡單
func (t *Topic) Pause() error {
return t.doPause(true)
}
func (t *Topic) UnPause() error {
return t.doPause(false)
}
func (t *Topic) doPause(pause bool) error {
if pause {
atomic.StoreInt32(&t.paused, 1)
} else {
atomic.StoreInt32(&t.paused, 0)
}
select {
case t.pauseChan <- 1:
case <-t.exitChan:
}
return nil
}
func (t *Topic) IsPaused() bool {
return atomic.LoadInt32(&t.paused) == 1
}