NSQ原始碼分析(五)——Channel
Channel相關的程式碼主要位於nsqd/channel.go
, nsqd/nsqd.go
中。
Channel是消費者訂閱特定Topic的一種抽象。對於發往Topic的訊息,nsqd向該Topic下的所有Channel投遞訊息,而同一個Channel只投遞一次,Channel下如果存在多個消費者,則隨機選擇一個消費者做投遞。這種投遞方式可以被用作消費者負載均衡。Channel會將訊息進行排列,如果沒有消費者讀取訊息,訊息首先會在記憶體中排隊,當量太大時就會被儲存到磁碟中。
一、Channel的建立和初始化
1.初始化Channel,初始化topicName,name,memoryMsgChan,ctx,clients及刪除函式deleteCallback
2.給e2eProcessingLatencyStream賦值,主要用於統計訊息投遞的延遲等
3.initPQ函式建立了兩個mapinFlightMessages、deferredMessages和兩個佇列inFlightPQ和deferredPQ。主要用於索引和存放這兩類訊息
4.初始化backend為diskqueue,磁碟儲存的訊息檔案
5.通知nsqd建立了Channel
func NewChannel(topicName string, channelName string, ctx *context, deleteCallback func(*Channel)) *Channel { c := &Channel{ topicName: topicName, name: channelName, memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize), clients: make(map[int64]Consumer), deleteCallback: deleteCallback, ctx: ctx, } if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 { c.e2eProcessingLatencyStream = quantile.New( ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime, ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles, ) } c.initPQ() if strings.HasSuffix(channelName, "#ephemeral") { c.ephemeral = true c.backend = newDummyBackendQueue() } else { dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) { opts := ctx.nsqd.getOpts() lg.Logf(opts.Logger, opts.logLevel, lg.LogLevel(level), f, args...) } // backend names, for uniqueness, automatically include the topic... backendName := getBackendName(topicName, channelName) c.backend = diskqueue.New( backendName, ctx.nsqd.getOpts().DataPath, ctx.nsqd.getOpts().MaxBytesPerFile, int32(minValidMsgLength), int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength, ctx.nsqd.getOpts().SyncEvery, ctx.nsqd.getOpts().SyncTimeout, dqLogf, ) } c.ctx.nsqd.Notify(c) return c }
initPQ函式
initPQ 主要用於索引和存放這兩類訊息
1.獲取佇列緩衝長度pgSize 值為 1和MemQueueSize/10的最大值,MemQueueSize的預設值為10000
2.初始化inFlightMessages,儲存Message的MessageID和Message的對應關係
3.初始化inFlightPQ佇列,正在投遞但還沒確認投遞成功的訊息
4.初始化deferredMessages 和 deferredPQ ,deferredPQ 佇列是延時訊息和投遞失敗等待指定時間後重新投遞的訊息
func (c *Channel) initPQ() {
pqSize := int(math.Max(1, float64(c.ctx.nsqd.getOpts().MemQueueSize)/10))
c.inFlightMutex.Lock()
c.inFlightMessages = make(map[MessageID]*Message)
c.inFlightPQ = newInFlightPqueue(pqSize)
c.inFlightMutex.Unlock()
c.deferredMutex.Lock()
c.deferredMessages = make(map[MessageID]*pqueue.Item)
c.deferredPQ = pqueue.New(pqSize)
c.deferredMutex.Unlock()
}
二、Channel中的訊息來源
在分析Topic時提到,訊息進入Topic的訊息迴圈後會被投遞到該Topic下所有的Channel,由Channel的PutMessage函式進行處理。
以下是topic的messagePump函式的片段(原始碼在nsq/nsqd/topic.go檔案中的messagePump函式)
func (t *Topic) messagePump() {
......
for {
......
for i, channel := range chans { //遍歷topic的所有的channel
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
//複製訊息,因為每個channel需要唯一的例項
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 { //傳送延時訊息
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
//傳送即時訊息
err := channel.PutMessage(chanMsg)
if err != nil {
t.ctx.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
}
}
從中我們看到topic將Message傳送給所有關聯的Channels,訊息有兩種即時訊息和延時訊息
Channel接收到延時訊息的處理流程
1.Channel中的messageCount自增,messageCount也就是訊息數量
2.呼叫StartDeferredTimeout函式,將訊息維護到pushDeferredMessage和deferredPQ優先順序佇列中
func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) {
atomic.AddUint64(&c.messageCount, 1)
c.StartDeferredTimeout(msg, timeout)
}
繼續來看StartDeferredTimeout函式
將訊息新增到deferredMessages 和 deferredPQ 佇列中等待投遞
1.初始化item,Priority的值為當前時間+延時時間的時間戳
2.呼叫pushDeferredMessage函式將訊息新增到pushDeferredMessage中,pushDeferredMessage該map中儲存了MessageID和Message的對應關係
3.呼叫addToDeferredPQ將item新增到deferredPQ優先順序佇列中
func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
absTs := time.Now().Add(timeout).UnixNano()
item := &pqueue.Item{Value: msg, Priority: absTs}
err := c.pushDeferredMessage(item)
if err != nil {
return err
}
c.addToDeferredPQ(item)
return nil
}
pushDeferredMessage函式
//向deferredMessages map中新增重新投遞的訊息資訊
func (c *Channel) pushDeferredMessage(item *pqueue.Item) error {
c.deferredMutex.Lock()
// TODO: these map lookups are costly
id := item.Value.(*Message).ID
_, ok := c.deferredMessages[id]
if ok {
c.deferredMutex.Unlock()
return errors.New("ID already deferred")
}
c.deferredMessages[id] = item
c.deferredMutex.Unlock()
return nil
}
addToDeferredPQ函式
//向deferredPQ佇列中新增元素
func (c *Channel) addToDeferredPQ(item *pqueue.Item) {
c.deferredMutex.Lock()
heap.Push(&c.deferredPQ, item)
c.deferredMutex.Unlock()
}
Channel接收到即時訊息的處理流程
1.如果檔案channel已經退出,則返回錯誤
2.呼叫put(m),將訊息寫到記憶體佇列memoryMsgChan或磁碟檔案中
3.將該channel的訊息數量原子性加1
func (c *Channel) PutMessage(m *Message) error {
c.RLock()
defer c.RUnlock()
if c.Exiting() { //channel已經退出
return errors.New("exiting")
}
err := c.put(m)
if err != nil {
return err
}
atomic.AddUint64(&c.messageCount, 1)
return nil
}
put函式
1.memoryMsgChan記憶體佇列預設緩衝是10000,如果memoryMsgChan已滿,則寫入到硬碟中
2.通過bufferPoolGet函式從buffer池中獲取一個buffer,bufferPoolGet及以下bufferPoolPut函式是對sync.Pool的簡單包裝。兩個函式位於nsqd/buffer_pool.go中。
3.呼叫writeMessageToBackend函式將訊息寫入磁碟檔案中。
4.通過bufferPoolPut函式將buffer歸還buffer池。
5.呼叫SetHealth函式將writeMessageToBackend的返回值寫入errValue變數。該變數衍生出IsHealthy,GetError和GetHealth3個函式,主要用於測試以及從HTTP API獲取nsqd的執行情況(是否發生錯誤)
func (c *Channel) put(m *Message) error {
select {
case c.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, c.backend)
bufferPoolPut(b)
c.ctx.nsqd.SetHealth(err)
if err != nil {
c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
c.name, err)
return err
}
}
return nil
}
三、Channel中訊息的投遞
Channel中的訊息是要投遞給客戶端(消費者),第一節講到在tcp server監聽到有新的客戶端連線時會開啟一個協程,呼叫protocol_v2檔案中的IOLoop(conn net.Conn)進行客戶端讀寫操作。在IOLoop函式中會開啟一個協程呼叫messagePump函式來輪詢將Channel中的訊息寫給客戶端。下面我們主要來看下messagePump函式
原始碼在nsq/nsqd/protocol_v2.go檔案中
處理channel中的訊息,channel接收到的訊息主要在memoryMsgChan和磁碟檔案中
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
.......
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg, err := decodeMessage(b)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case msg := <-memoryMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++ //投遞嘗試的次數
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
}
看到無論是從磁碟中取出的訊息還是從記憶體佇列中取出的訊息,執行的流程差不多。
1.msg的Attempts自增(訊息嘗試投遞的次數)
2.呼叫StartInFlightTimeout函式將本條訊息msg新增到inFlightMessages和inFlightPQ優先佇列中 (inFlightMessages和inFlightPQ存放已投遞但不確定是否投遞成功的訊息)
3.呼叫SendingMessage函式將clientV2中的InFlightCount和MessageCount自增
4.呼叫SendMessage函式將訊息傳送給客戶端
四、訊息投遞後的處理
客戶端成功消費一條訊息後,會發送一個FIN訊息,帶上message ID 或者客戶端如果消費失敗,也會發送一個REQ的請求。IOLoop函式中除了開啟一個協程呼叫messagePump函式輪詢的投遞Channel中的訊息,for迴圈模組中也在輪詢讀取從客戶端返回的訊息。
func (p *protocolV2) IOLoop(conn net.Conn) error {
.....
for {
......
line, err = client.Reader.ReadSlice('\n')
if err != nil {
if err == io.EOF {
err = nil
} else {
err = fmt.Errorf("failed to read command - %s", err)
}
break
}
/*
去除行尾的\n \r,並按空格切分成params
*/
// trim the '\n'
line = line[:len(line)-1]
// optionally trim the '\r'
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
params := bytes.Split(line, separatorBytes)
p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)
var response []byte
response, err = p.Exec(client, params)
}
}
訊息投送成功的處理
客戶端成功消費一條訊息後,會發送一個FIN訊息。會執行到Exec函式中的FIN流程,最後呼叫FIN函式
1.獲取訊息id
2.呼叫FinishMessage方法,從 inFlightMessages 和 inFlightPQ 佇列中移除該訊息
3.呼叫 FinishedMessage將該clientV2的FinishCount增1,InFlightCount減1,並並向ReadStateChan傳送一個訊息;如果服務端因為RDY限制停止推送訊息,收到這個訊息後,也會重新檢視是否可以繼續推送訊息。
func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) {
state := atomic.LoadInt32(&client.State)
if state != stateSubscribed && state != stateClosing {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "cannot FIN in current state")
}
if len(params) < 2 {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "FIN insufficient number of params")
}
id, err := getMessageID(params[1])
if err != nil {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", err.Error())
}
err = client.Channel.FinishMessage(client.ID, *id)
if err != nil {
return nil, protocol.NewClientErr(err, "E_FIN_FAILED",
fmt.Sprintf("FIN %s failed %s", *id, err.Error()))
}
client.FinishedMessage()
return nil, nil
}
訊息投遞失敗的處理
訊息投遞失敗的處理流程主要在REQ函式中
1.獲取訊息id
2.獲取timeoutDuration的值
3.呼叫RequeueMessage方法,將訊息msg 根據訊息id從inFlightMessages和inFlightPQ佇列中移除,並根據timeoutDuration的值決定將該訊息新增到deferredMessages 和 deferredPQ 佇列中,還是放到memoryMsgChan或磁碟檔案中 並等待下次投遞
4.呼叫RequeuedMessage方法,將clientV2的RequeueCount值增1,將InFlightCount,減1,並並向ReadStateChan傳送一個訊息
func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error) {
state := atomic.LoadInt32(&client.State)
if state != stateSubscribed && state != stateClosing {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "cannot REQ in current state")
}
if len(params) < 3 {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "REQ insufficient number of params")
}
id, err := getMessageID(params[1])
if err != nil {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", err.Error())
}
timeoutMs, err := protocol.ByteToBase10(params[2])
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_INVALID",
fmt.Sprintf("REQ could not parse timeout %s", params[2]))
}
timeoutDuration := time.Duration(timeoutMs) * time.Millisecond
maxReqTimeout := p.ctx.nsqd.getOpts().MaxReqTimeout
clampedTimeout := timeoutDuration
if timeoutDuration < 0 {
clampedTimeout = 0
} else if timeoutDuration > maxReqTimeout {
clampedTimeout = maxReqTimeout
}
if clampedTimeout != timeoutDuration {
p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] REQ timeout %d out of range 0-%d. Setting to %d",
client, timeoutDuration, maxReqTimeout, clampedTimeout)
timeoutDuration = clampedTimeout
}
err = client.Channel.RequeueMessage(client.ID, *id, timeoutDuration)
if err != nil {
return nil, protocol.NewClientErr(err, "E_REQ_FAILED",
fmt.Sprintf("REQ %s failed %s", *id, err.Error()))
}
client.RequeuedMessage()
return nil, nil
}
RequeueMessage函式是訊息投遞失敗的主要流程
1.將訊息msg 根據訊息id從inFlightMessages和inFlightPQ佇列中移除
2.如果timeout為0,則將該訊息重新新增到memoryMsgChan或磁碟檔案中,等待下次投遞
3.如果timeout大於0,則將訊息新增到deferredMessages 和 deferredPQ 佇列中等待重新投遞
func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error {
// remove from inflight first
msg, err := c.popInFlightMessage(clientID, id)
if err != nil {
return err
}
c.removeFromInFlightPQ(msg)
atomic.AddUint64(&c.requeueCount, 1)
if timeout == 0 {
c.exitMutex.RLock()
if c.Exiting() {
c.exitMutex.RUnlock()
return errors.New("exiting")
}
err := c.put(msg)
c.exitMutex.RUnlock()
return err
}
// deferred requeue
return c.StartDeferredTimeout(msg, timeout)
}
(1)timeout為0的情況(timeout可以理解成訊息投遞失敗後,需要等待多久之後再投遞)
呼叫put函式將訊息寫到memoryMsgChan或磁碟檔案中,前面已經介紹過這個函式,這裡就不在詳細說明。
func (c *Channel) put(m *Message) error {
select {
case c.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, c.backend)
bufferPoolPut(b)
c.ctx.nsqd.SetHealth(err)
if err != nil {
c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
c.name, err)
return err
}
}
return nil
}
(2)timeout大於0的情況
呼叫StartDeferredTimeout函式將訊息寫入到pushDeferredMessage 和 deferredPQ中。這個函式在前面Channel中獲取延時訊息也是呼叫這個函式。
func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
absTs := time.Now().Add(timeout).UnixNano()
item := &pqueue.Item{Value: msg, Priority: absTs}
err := c.pushDeferredMessage(item)
if err != nil {
return err
}
c.addToDeferredPQ(item)
return nil
}
五、Channel的暫停和取消暫停
Channel的暫停和取消暫停和Topic的操作一樣,由Channel中paused欄位的值決定,該欄位是原子操作的,paused為1表示暫停狀態,0表示未暫停。
func (c *Channel) Pause() error {
return c.doPause(true)
}
func (c *Channel) UnPause() error {
return c.doPause(false)
}
//暫停或取消暫停向客戶端傳送訊息
func (c *Channel) doPause(pause bool) error {
if pause {
atomic.StoreInt32(&c.paused, 1)
} else {
atomic.StoreInt32(&c.paused, 0)
}
c.RLock()
for _, client := range c.clients {
if pause {
client.Pause()
} else {
client.UnPause()
}
}
c.RUnlock()
return nil
}
//返回該Channel是否是暫停狀態
func (c *Channel) IsPaused() bool {
return atomic.LoadInt32(&c.paused) == 1
}