聊聊rocketmq的consumeMessageBatchMaxSize
序
本文主要研究一下rocketmq的consumeMessageBatchMaxSize
consumeMessageBatchMaxSize
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
private final InternalLogger log = ClientLogger.getLog();
//......
/**
* Batch consumption size
*/
private int consumeMessageBatchMaxSize = 1;
public int getConsumeMessageBatchMaxSize () {
return consumeMessageBatchMaxSize;
}
public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
}
//......
}
複製程式碼
- DefaultMQPushConsumer定義了consumeMessageBatchMaxSize屬性,預設值為1
checkConfig
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
//......
private void checkConfig() throws MQClientException {
Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());
//......
// consumeMessageBatchMaxSize
if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1
|| this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {
throw new MQClientException(
"consumeMessageBatchMaxSize Out of range [1,1024]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
}
//......
}
//......
}
複製程式碼
- checkConfig方法會校驗defaultMQPushConsumer.getConsumeMessageBatchMaxSize(),要求其值必須大於等於且小於等於1024
submitConsumeRequest
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
//......
public void submitConsumeRequest(
final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs,processQueue,messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++,total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis,messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
//......
}
複製程式碼
- submitConsumeRequest方法在msgs.size()小於等於consumeBatchSize時會建立ConsumeRequest,然後提交到consumeExecutor執行,若出現RejectedExecutionException則執行submitConsumeRequestLater;對於msgs.size()大於consumeBatchSize的,則按consumeBatchSize分批建立ConsumeRequest提交給consumeExecutor執行,若出現RejectedExecutionException則將剩餘的msg新增到msgThis,然後執行submitConsumeRequestLater
小結
DefaultMQPushConsumer定義了consumeMessageBatchMaxSize屬性,預設值為1;DefaultMQPushConsumerImpl的checkConfig方法會校驗defaultMQPushConsumer.getConsumeMessageBatchMaxSize(),要求其值必須大於等於且小於等於1024;ConsumeMessageConcurrentlyService的submitConsumeRequest方法在msgs.size()小於等於consumeBatchSize時會建立ConsumeRequest,然後提交到consumeExecutor執行,若出現RejectedExecutionException則執行submitConsumeRequestLater;對於msgs.size()大於consumeBatchSize的,則按consumeBatchSize分批建立ConsumeRequest提交給consumeExecutor執行,若出現RejectedExecutionException則將剩餘的msg新增到msgThis,然後執行submitConsumeRequestLater