1. 程式人生 > 程式設計 >聊聊rocketmq的maxMessageSize

聊聊rocketmq的maxMessageSize

本文主要研究一下rocketmq的maxMessageSize

DefaultMQProducer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java

public class DefaultMQProducer extends ClientConfig implements MQProducer {

    private final InternalLogger log = ClientLogger.getLog();

    //......

    /**
     * Maximum allowed message size in
bytes. */ private int maxMessageSize = 1024 * 1024 * 4; // 4M public int getMaxMessageSize() { return maxMessageSize; } public void setMaxMessageSize(int maxMessageSize) { this.maxMessageSize = maxMessageSize; } public SendResult send( Message msg) throws MQClientException,RemotingException,MQBrokerException,InterruptedException { Validators.checkMessage(msg,this); msg.setTopic(withNamespace(msg.getTopic())); return
this.defaultMQProducerImpl.send(msg); } private MessageBatch batch(Collection<Message> msgs) throws MQClientException { MessageBatch msgBatch; try { msgBatch = MessageBatch.generateFromList(msgs); for (Message message : msgBatch) { Validators.checkMessage(message,this); MessageClientIDSetter.setUniqID(message); message.setTopic(withNamespace(message.getTopic())); } msgBatch.setBody(msgBatch.encode()); } catch (Exception e) { throw new MQClientException("Failed to initiate the MessageBatch"
,e); } msgBatch.setTopic(withNamespace(msgBatch.getTopic())); return msgBatch; } //...... } 複製程式碼
  • DefaultMQProducer定義了maxMessageSize,預設是4M大小;send方法及batch方法都會呼叫Validators.checkMessage(message,this)校驗訊息

Validators

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/Validators.java

public class Validators {
    public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
    public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR);
    public static final int CHARACTER_MAX_LENGTH = 255;

    //......

    /**
     * Validate message
     */
    public static void checkMessage(Message msg,DefaultMQProducer defaultMQProducer)
        throws MQClientException {
        if (null == msg) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message is null");
        }
        // topic
        Validators.checkTopic(msg.getTopic());

        // body
        if (null == msg.getBody()) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message body is null");
        }

        if (0 == msg.getBody().length) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message body length is zero");
        }

        if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message body size over max value,MAX: " + defaultMQProducer.getMaxMessageSize());
        }
    }

    //......
}
複製程式碼
  • checkMessage方法首先checkTopic,然後校驗msg.getBody()是否為null,或者長度是否為0,最後校驗長度是否大於defaultMQProducer.getMaxMessageSize(),校驗不通過都會丟擲MQClientException

MessageStoreConfig

rocketmq/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

public class MessageStoreConfig {
    //The root directory in which the log data is kept
    @ImportantField
    private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";

    //The directory in which the commitlog is kept
    @ImportantField
    private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
        + File.separator + "commitlog";

    //......

    // The maximum size of message,default is 4M
    private int maxMessageSize = 1024 * 1024 * 4;

    public int getMaxMessageSize() {
        return maxMessageSize;
    }

    public void setMaxMessageSize(int maxMessageSize) {
        this.maxMessageSize = maxMessageSize;
    }

    //......
}    
複製程式碼
  • MessageStoreConfig定義了maxMessageSize屬性,預設為4M

在rocketmq安裝目錄的conf/broker.conf中指定maxMessageSize=65536

DefaultMessageStore

rocketmq/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

public class DefaultMessageStore implements MessageStore {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private final MessageStoreConfig messageStoreConfig;
    // CommitLog
    private final CommitLog commitLog;

    //......

    public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
        if (this.shutdown) {
            log.warn("DefaultMessageStore has shutdown,so putMessages is forbidden");
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE,null);
        }

        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("DefaultMessageStore is in slave mode,so putMessages is forbidden ");
            }

            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE,null);
        }

        if (!this.runningFlags.isWriteable()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("DefaultMessageStore is not writable,so putMessages is forbidden " + this.runningFlags.getFlagBits());
            }

            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE,null);
        } else {
            this.printTimes.set(0);
        }

        if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
            log.warn("PutMessages topic length too long " + messageExtBatch.getTopic().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,null);
        }

        if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) {
            log.warn("PutMessages body length too long " + messageExtBatch.getBody().length);
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,null);
        }

        if (this.isOSPageCacheBusy()) {
            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY,null);
        }

        long beginTime = this.getSystemClock().now();
        PutMessageResult result = this.commitLog.putMessages(messageExtBatch);

        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("not in lock elapsed time(ms)={},bodyLength={}",elapsedTime,messageExtBatch.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }

        return result;
    }

    //......
}
複製程式碼
  • DefaultMessageStore的putMessages方法會判斷messageExtBatch.getBody().length是否大於messageStoreConfig.getMaxMessageSize(),大於的話則返回PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,null)

小結

rocketmq的client端及broker端均有對訊息體大小是否超出maxMessageSize進行校驗;client端的DefaultMQProducer定義了maxMessageSize,預設是4M大小;send方法及batch方法都會呼叫Validators.checkMessage(message,this)校驗訊息;服務端conf/broker.conf可以指定maxMessageSize大小;如果需要修改maxMessageSize大小需要跟服務端配合一起修改

doc