剖析nsq訊息佇列(四) 訊息的負載處理
剖析nsq訊息佇列-目錄
實際應用中,一部分服務叢集可能會同時訂閱同一個topic
,並且處於同一個channel
下。當nsqd
有訊息需要傳送給訂閱客戶端去處理時,發給哪個客戶端是需要考慮的,也就是我要說的訊息的負載。
如果不考慮負載情況,把隨機的把訊息傳送到某一個客服端去處理訊息,如果機器的效能不同,可能發生的情況就是某一個或幾個客戶端處理速度慢,但還有大量新的訊息需要處理,其他的客戶端處於空閒狀態。理想的狀態是,找到當前相對空閒的客戶端去處理訊息。
nsq
的處理方式是客戶端主動向nsqd
報告自已的可處理訊息數量(也就是RDY
命令)。nsqd
根據每個連線的客戶端的可處理訊息的狀態來隨機把訊息傳送到可用的客戶端,來進行訊息處理
如下圖所示:
客戶端更新RDY
從第一篇帖子的例子中我們就有配置consumer的config
config := nsq.NewConfig()
config.MaxInFlight = 1000
config.MaxBackoffDuration = 5 * time.Second
config.DialTimeout = 10 * time.Second
MaxInFlight
來設定最大的處理中的訊息數量,會根據這個變數計算在是否更新RDY
初始化的時候 客戶端會向連線的nsqd服務端來發送updateRDY來設定最大處理數,
func (r *Consumer) maybeUpdateRDY(conn *Conn) { inBackoff := r.inBackoff() inBackoffTimeout := r.inBackoffTimeout() if inBackoff || inBackoffTimeout { r.log(LogLevelDebug, "(%s) skip sending RDY inBackoff:%v || inBackoffTimeout:%v", conn, inBackoff, inBackoffTimeout) return } remain := conn.RDY() lastRdyCount := conn.LastRDY() count := r.perConnMaxInFlight() // refill when at 1, or at 25%, or if connections have changed and we're imbalanced if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) { r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)", conn, count, remain, lastRdyCount) r.updateRDY(conn, count) } else { r.log(LogLevelDebug, "(%s) skip sending RDY %d (%d remain out of last RDY %d)", conn, count, remain, lastRdyCount) } }
當剩餘的可用處理數量remain
小於等於1,或者小於最後一次設定的可用數量lastRdyCount
的1/4時,或者可用連線平均的maxInFlight大於0並且小於remain
時,則更新RDY
狀態
當有多個nsqd
時,會把最大的訊息進行平均計算:
// perConnMaxInFlight calculates the per-connection max-in-flight count. // // This may change dynamically based on the number of connections to nsqd the Consumer // is responsible for. func (r *Consumer) perConnMaxInFlight() int64 { b := float64(r.getMaxInFlight()) s := b / float64(len(r.conns())) return int64(math.Min(math.Max(1, s), b)) }
當有訊息從nsqd
傳送過來後也會呼叫maybeUpdateRDY
方法,計算是否需要傳送RDY
命令
func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
atomic.AddInt64(&r.totalRdyCount, -1)
atomic.AddUint64(&r.messagesReceived, 1)
r.incomingMessages <- msg
r.maybeUpdateRDY(c)
}
上面就是主要的處理邏輯,但還有一些邏輯,就是當訊息處理髮生錯誤時,nsq
有自己的退避演算法backoff
也會更新RDY
簡單來說就是當發現有處理錯誤時,來進行重試和指數退避,在退避期間RDY
會為0,重試時會先放嘗試RDY
為1看有沒有錯誤,如果沒有錯誤則全部放開,這個演算法這篇帖子我就不詳細說了。
服務端nsqd選擇客戶端進行傳送訊息
同時訂閱同一topic
的客戶端(comsumer)有很多個,每個客戶端根據自己的配置或狀態傳送RDY
命令到nsqd
表明自己能處理多少訊息量
nsqd服務端會檢查每個客戶端的的狀態是否可以傳送訊息。也就是IsReadyForMessages
方法,判斷inFlightCount是否大於readyCount,如果大於或者等於就不再給客戶端傳送資料,等待Ready
後才會再給客戶端傳送資料
func (c *clientV2) IsReadyForMessages() bool {
if c.Channel.IsPaused() {
return false
}
readyCount := atomic.LoadInt64(&c.ReadyCount)
inFlightCount := atomic.LoadInt64(&c.InFlightCount)
c.ctx.nsqd.logf(LOG_DEBUG, "[%s] state rdy: %4d inflt: %4d", c, readyCount, inFlightCount)
if inFlightCount >= readyCount || readyCount <= 0 {
return false
}
return true
每一次傳送訊息inFlightCount
會+1並儲存到傳送中的佇列中,當客戶端傳送FIN會-1在之前的帖子中有說過。
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
// ...
for {
// 檢查訂閱狀態和訊息是否可處理狀態
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages...
memoryMsgChan = nil
backendMsgChan = nil
flusherChan = nil
// ...
flushed = true
} else if flushed {
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = nil
} else {
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
}
select {
case <-flusherChan:
// ...
// 訊息處理
case b := <-backendMsgChan:
client.SendingMessage()
// ...
case msg := <-memoryMsgChan:
client.SendingMessage()
//...
}
}
// ...
}