1. 程式人生 > 實用技巧 >RocketMQ 消費端限流機制

RocketMQ 消費端限流機制

RocketMQ 訊息消費端會從 3 個維度進行限流:

  1. 訊息消費端佇列中積壓的訊息超過 1000 條
  2. 訊息處理佇列中儘管積壓沒有超過 1000 條,但最大偏移量與最小偏移量的差值超過 2000
  3. 訊息處理佇列中積壓的訊息總大小超過 100M

為了方便理解上述三條規則的設計理念,我們首先來看一下消費端的資料結構,如下圖所示:

8

PullMessageService 執行緒會按照佇列向 Broker 拉取一批訊息,然後會存入到 ProcessQueue 佇列中,即所謂的處理佇列,然後再提交到消費端執行緒池中進行訊息消費,訊息消費完成後會將對應的訊息從 ProcessQueue 中移除,然後向 Broker 端提交消費進度,提交的消費偏移量為 ProceeQueue 中的最小偏移量。

規則一:訊息消費端佇列中積壓的訊息超過 1000 條值的就是 ProcessQueue 中存在的訊息條數超過指定值,預設為 1000 條,就觸發限流,限流的具體做法就是暫停向 Broker 拉取該佇列中的訊息,但並不會阻止其他佇列的訊息拉取。例如如果 q0 中積壓的訊息超過 1000 條,但 q1 中積壓的訊息不足 1000,那 q1 佇列中的訊息會繼續消費。其目的就是擔心積壓的訊息太多,如果再繼續拉取,會造成記憶體溢位。

規則二:訊息在 ProcessQueue 中實際上維護的是一個 TreeMap,key 為訊息的偏移量、vlaue 為訊息物件,由於 TreeMap 本身是排序的,故很容易得出最大偏移量與最小偏移量的差值,即有可能存在處理佇列中其實就只有 3 條訊息,但偏移量確超過了 2000,例如如下圖所示:

9

出現這種情況也是非常有可能的,其主要原因就是消費偏移量為 100 的這個執行緒由於某種情況卡主了(“阻塞”了),其他訊息卻能正常消費,這種情況雖然不會造成記憶體溢位,但大概率會造成大量訊息重複消費,究其原因與訊息消費進度的提交機制有關,在 RocketMQ 中,例如訊息偏移量為 2001 的訊息消費成功後,向服務端彙報消費進度時並不是報告 2001,而是取處理佇列中最小偏移量 100,這樣雖然訊息一直在處理,但訊息消費進度始終無法向前推進,試想一下如果此時最大的訊息偏移量為 1000,專案組發現出現了訊息積壓,然後重啟消費端,那訊息就會從 100 開始重新消費,會造成大量訊息重複消費,RocketMQ 為了避免出現大量訊息重複消費,故對此種情況會對其進行限制,超過 2000 就不再拉取訊息了。

規則三:訊息處理佇列中積壓的訊息總大小超過 100M。

這個就更加直接了,不僅從訊息數量考慮,再結合從訊息體大小考慮,處理佇列中訊息總大小超過 100M 進行限流,這個顯而易見就是為了避免記憶體溢位。

在瞭解了 RocketMQ 訊息限流規則後,會在 rocketmq_client.log 中輸出相關的限流日誌,具體搜尋“so do flow control”,詳細如下圖所示:

10