1. 程式人生 > 程式設計 >聊聊rocketmq的consumeMessageBatchMaxSize

聊聊rocketmq的consumeMessageBatchMaxSize

本文主要研究一下rocketmq的consumeMessageBatchMaxSize

consumeMessageBatchMaxSize

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

    private final InternalLogger log = ClientLogger.getLog();

    //......

    /**
     * Batch consumption size
     */
    private int consumeMessageBatchMaxSize = 1;

    public int getConsumeMessageBatchMaxSize
() { return consumeMessageBatchMaxSize; } public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) { this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize; } //...... } 複製程式碼
  • DefaultMQPushConsumer定義了consumeMessageBatchMaxSize屬性,預設值為1

checkConfig

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

	//......

    private void checkConfig() throws MQClientException {
        Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());

        //......

        // consumeMessageBatchMaxSize
        if
(this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1 || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) { throw new MQClientException( "consumeMessageBatchMaxSize Out of range [1,1024]" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null); } //...... } //...... } 複製程式碼
  • checkConfig方法會校驗defaultMQPushConsumer.getConsumeMessageBatchMaxSize(),要求其值必須大於等於且小於等於1024

submitConsumeRequest

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {

	//......

    public void submitConsumeRequest(
        final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs,processQueue,messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            for (int total = 0; total < msgs.size(); ) {
                List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++,total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }

                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis,messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }

                    this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }

    //......
}
複製程式碼
  • submitConsumeRequest方法在msgs.size()小於等於consumeBatchSize時會建立ConsumeRequest,然後提交到consumeExecutor執行,若出現RejectedExecutionException則執行submitConsumeRequestLater;對於msgs.size()大於consumeBatchSize的,則按consumeBatchSize分批建立ConsumeRequest提交給consumeExecutor執行,若出現RejectedExecutionException則將剩餘的msg新增到msgThis,然後執行submitConsumeRequestLater

小結

DefaultMQPushConsumer定義了consumeMessageBatchMaxSize屬性,預設值為1;DefaultMQPushConsumerImpl的checkConfig方法會校驗defaultMQPushConsumer.getConsumeMessageBatchMaxSize(),要求其值必須大於等於且小於等於1024;ConsumeMessageConcurrentlyService的submitConsumeRequest方法在msgs.size()小於等於consumeBatchSize時會建立ConsumeRequest,然後提交到consumeExecutor執行,若出現RejectedExecutionException則執行submitConsumeRequestLater;對於msgs.size()大於consumeBatchSize的,則按consumeBatchSize分批建立ConsumeRequest提交給consumeExecutor執行,若出現RejectedExecutionException則將剩餘的msg新增到msgThis,然後執行submitConsumeRequestLater

doc