關於RocketMQ訊息消費與重平衡的一些問題探討
其實最好的學習方式就是互相交流,最近也有跟網友討論了一些關於 RocketMQ 訊息拉取與重平衡的問題,我姑且在這裡寫下我的一些總結。
關於 push 模式下的訊息迴圈拉取問題
之前發表了一篇關於重平衡的文章:「Kafka 重平衡機制」,裡面有說到 RocketMQ 重平衡機制是每隔 20s 從任意一個 Broker 節點獲取消費組的消費 ID 以及訂閱資訊,再根據這些訂閱資訊進行分配,然後將分配到的資訊封裝成 pullRequest 物件 pull 到 pullRequestQueue 佇列中,拉取執行緒喚醒後執行拉取任務,流程圖如下:
但是其中有一些是沒有詳細說的,比如每次拉訊息都要等 20s 嗎?真的有個網友問了我如下問題:
很顯然他的專案是用了 push 模式進行訊息拉取,要回答這個問題,就要從 RockeMQ 的訊息拉取說起:
RocketMQ 的 push 模式的實現是基於 pull 模式,只不過在 pull 模式上套了一層,所以RocketMQ push 模式並不是真正意義上的 ”推模式“,因此,在 push 模式下,消費者拉取完訊息後,立馬就有開始下一個拉取任務,並不會真的等 20s 重平衡後才拉取,至於 push 模式是怎麼實現的,那就從原始碼去找答案。
之前有寫過一篇文章:「RocketMQ為什麼要保證訂閱關係的一致性?」,裡面有說過 訊息拉取是從 PullRequestQueue 阻塞佇列中取出 PullRequest 拉取任務進行訊息拉取的,但 PullRequest 是怎麼放進 PullRequestQueue 阻塞佇列中的呢?
RocketMQ 一共提供了以下方法:
org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately:
public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
從呼叫鏈發現,除了重平衡會呼叫該方法之外,在 push 模式下,PullCallback 回撥物件中的 onSuccess 方法在訊息消費時,也呼叫了該方法:
org.apache.rocketmq.client.consumer.PullCallback#onSuccess:
case FOUND:
// 如果本次拉取訊息為空,則繼續將pullRequest放入阻塞佇列中
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
// 將訊息放入消費者消費執行緒去執行
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
pullResult.getMsgFoundList(), //
processQueue, //
pullRequest.getMessageQueue(), //
dispathToConsume);
// 將pullRequest放入阻塞佇列中
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
當從 broker 拉取到訊息後,如果訊息被過濾掉,則繼續將pullRequest放入阻塞佇列中繼續迴圈執行訊息拉取任務,否則將訊息放入消費者消費執行緒去執行,在pullRequest放入阻塞佇列中。
case NO_NEW_MESSAGE:
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
如果從 broker 端沒有可拉取的新訊息或者沒有匹配到訊息,則將pullRequest放入阻塞佇列中繼續迴圈執行訊息拉取任務。
從以上訊息消費邏輯可以看出,當訊息處理完後,立即將 pullRequest 重新放入阻塞佇列中,因此這就很好解釋為什麼 push 模式可以持續拉取訊息了:
在 push 模式下訊息消費完後,還會呼叫該方法重新將 PullRequest 物件放進 PullRequestQueue 阻塞佇列中,不斷地從 broker 中拉取訊息,實現 push 效果。
重平衡後佇列被其它消費者分配後如何處理?
繼續再想一個問題,如果重平衡後,發現某個佇列被新的消費者分配了,怎麼辦,總不能繼續從該佇列中拉取訊息吧?
RocketMQ 重平衡後會檢查 pullRequest 是否還在新分配的列表中,如果不在,則丟棄,呼叫 isDrop() 可查出該pullRequest是否已丟棄:
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage:
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
在訊息拉取之前,首先判斷該佇列是否被丟棄,如果已丟棄,則直接放棄本次拉取任務。
那什麼時候佇列被丟棄呢?
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
// 判斷當前快取 MessageQueue 是否包含在最新的 mqSet 中,如果不存在則將佇列丟棄
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
// 如果佇列拉取過期則丟棄
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
updateProcessQueueTableInRebalance 方法在重平衡時執行,用於更新 processQueueTable,它是當前消費者的佇列快取列表,以上方法邏輯判斷當前快取 MessageQueue 是否包含在最新的 mqSet 中,如果不包含其中,則說明經過這次重平衡後,該佇列被分配給其它消費者了,或者拉取時間間隔太大過期了,則呼叫 setDropped(true) 方法將佇列置為丟棄狀態。
可能你會問,processQueueTable 跟 pullRequest 裡面 processQueue 有什麼關聯,往下看:
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:
// 新建 ProcessQueue
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
// 將ProcessQueue放入processQueueTable中
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
// 將ProcessQueue放入pullRequest拉取任務物件中
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
}
可以看出,重平衡時會建立 ProcessQueue 物件,將其放入 processQueueTable 快取隊列表中,再將其放入 pullRequest 拉取任務物件中,也就是 processQueueTable 中的 ProcessQueue 與 pullRequest 的中 ProcessQueue 是同一個物件。
重平衡後會導致訊息重複消費嗎?
之前在群裡有個網友提了這個問題:
我當時回答他 RocketMQ 正常也是沒有重複消費,但後來發現其實 RocketMQ 在某些情況下,也是會出現訊息重複消費的現象。
前面講到,RocketMQ 訊息消費時,會將訊息放進消費執行緒中去執行,程式碼如下:
org.apache.rocketmq.client.consumer.PullCallback#onSuccess:
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
pullResult.getMsgFoundList(), //
processQueue, //
pullRequest.getMessageQueue(), //
dispathToConsume);
ConsumeMessageService 類實現訊息消費的邏輯,它有兩個實現類:
// 併發訊息消費邏輯實現類
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// 順序訊息消費邏輯實現類
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
先看併發訊息消費相關處理邏輯:
ConsumeMessageConcurrentlyService:
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run:
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
// 訊息消費邏輯
// ...
// 如果佇列被設定為丟棄狀態,則不提交訊息消費進度
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
ConsumeRequest 是一個繼承了 Runnable 的類,它是訊息消費核心邏輯的實現類,submitConsumeRequest 方法將 ConsumeRequest 放入 消費執行緒池中執行訊息消費,從它的 run 方法中可看出,如果在執行訊息消費邏輯中有節點加入,重平衡後該佇列被分配給其它節點進行消費了,此時的佇列被丟棄,則不提交訊息消費進度,因為之前已經消費了,此時就會造成訊息重複消費的情況。
再來看看順序消費相關處理邏輯:
ConsumeMessageOrderlyService:
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run:
public void run() {
// 判斷佇列是否被丟棄
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
// 如果不是廣播模式,且佇列已加鎖且鎖沒有過期
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
// 再次判斷佇列是否被丟棄
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
// 訊息消費處理邏輯
// ...
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
RocketMQ 順序訊息消費會將佇列鎖定,當佇列獲取鎖之後才能進行消費,所以,即使訊息在消費過程中有節點加入,重平衡後該佇列被分配給其它節點進行消費了,此時的佇列被丟棄,依然不會造成重複消費。
更多精彩文章請關注作者維護的公眾號「後端進階」,這是一個專注後端相關技術的公眾號。
關注公眾號並回復「後端」免費領取後端相關電子書籍。
歡迎分享,轉載請保留出處。