RocketMQ原理解析-consumer 3.長輪詢
Rocketmq的訊息是由consumer端主動到broker拉取的, consumer向broker傳送拉訊息請求, PullMessageService服務通過一個執行緒將阻塞佇列LinkedBlockingQueue<PullRequest>中的PullRequest到broker拉取訊息
DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法執行向broker拉訊息動作
1. 獲取ProcessQueue判讀是否drop的, drop為true返回
2. 給ProcessQueue設定拉訊息時間戳
3. 流量控制,正在消費佇列中訊息(未被消費的)超過閥值,稍後在執行拉訊息
4. 流量控制,正在消費佇列中訊息的跨度超過閥值(預設2000),稍後在消費
5. 根據topic獲取訂閱關係
6. 構建拉訊息回撥物件PullBack, 從broker拉取訊息(非同步拉取)返回結果是回撥
7. 從記憶體中獲取commitOffsetValue //TODO 這個值跟pullRequest.getNextOffset區別
8. 構建sysFlag pull介面用到的flag
9. 調底層通訊層向broker傳送拉訊息請求
如果master壓力過大,會建議去slave拉取訊息
如果是到broker拉取訊息清楚實時提交標記位,因為slave不允許實時提交消費進度,可以定時提交
//TODO 關於master拉訊息實時提交指的是什麼?
10. 拉到訊息後回撥PullCallback
處理broker返回結果pullResult
更新從哪個broker(master 還是slave)拉取訊息
反序列化訊息
訊息過濾
訊息中放入佇列最大最小offset,方便應用來感知訊息堆積度
將訊息加入正在處理佇列ProcessQueue
將訊息提交到消費訊息服務ConsumeMessageService
流控處理, 如果pullInterval引數大於0 (拉訊息間隔,如果為了降低拉取速度,可以設定大於0的值),延遲再執行拉訊息, 如果pullInterval為0立刻在執行拉訊息動作
序列圖
1. 向broker傳送長輪詢請求
2. Broker接收長輪詢請求
3. Consumer接收broker響應
長輪詢活動圖:
一張圖畫不下,再來一張