聊聊rocketmq的maxMessageSize
阿新 • • 發佈:2019-12-31
序
本文主要研究一下rocketmq的maxMessageSize
DefaultMQProducer
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java
public class DefaultMQProducer extends ClientConfig implements MQProducer {
private final InternalLogger log = ClientLogger.getLog();
//......
/**
* Maximum allowed message size in bytes.
*/
private int maxMessageSize = 1024 * 1024 * 4; // 4M
public int getMaxMessageSize() {
return maxMessageSize;
}
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
public SendResult send(
Message msg) throws MQClientException,RemotingException,MQBrokerException,InterruptedException {
Validators.checkMessage(msg,this);
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
MessageBatch msgBatch;
try {
msgBatch = MessageBatch.generateFromList(msgs);
for (Message message : msgBatch) {
Validators.checkMessage(message,this);
MessageClientIDSetter.setUniqID(message);
message.setTopic(withNamespace(message.getTopic()));
}
msgBatch.setBody(msgBatch.encode());
} catch (Exception e) {
throw new MQClientException("Failed to initiate the MessageBatch" ,e);
}
msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
return msgBatch;
}
//......
}
複製程式碼
- DefaultMQProducer定義了maxMessageSize,預設是4M大小;send方法及batch方法都會呼叫Validators.checkMessage(message,this)校驗訊息
Validators
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/Validators.java
public class Validators {
public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR);
public static final int CHARACTER_MAX_LENGTH = 255;
//......
/**
* Validate message
*/
public static void checkMessage(Message msg,DefaultMQProducer defaultMQProducer)
throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message is null");
}
// topic
Validators.checkTopic(msg.getTopic());
// body
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message body is null");
}
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message body length is zero");
}
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message body size over max value,MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
//......
}
複製程式碼
- checkMessage方法首先checkTopic,然後校驗msg.getBody()是否為null,或者長度是否為0,最後校驗長度是否大於defaultMQProducer.getMaxMessageSize(),校驗不通過都會丟擲MQClientException
MessageStoreConfig
rocketmq/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
public class MessageStoreConfig {
//The root directory in which the log data is kept
@ImportantField
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
//The directory in which the commitlog is kept
@ImportantField
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog";
//......
// The maximum size of message,default is 4M
private int maxMessageSize = 1024 * 1024 * 4;
public int getMaxMessageSize() {
return maxMessageSize;
}
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
//......
}
複製程式碼
- MessageStoreConfig定義了maxMessageSize屬性,預設為4M
在rocketmq安裝目錄的conf/broker.conf中指定maxMessageSize=65536
DefaultMessageStore
rocketmq/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
public class DefaultMessageStore implements MessageStore {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private final MessageStoreConfig messageStoreConfig;
// CommitLog
private final CommitLog commitLog;
//......
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
if (this.shutdown) {
log.warn("DefaultMessageStore has shutdown,so putMessages is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE,null);
}
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("DefaultMessageStore is in slave mode,so putMessages is forbidden ");
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE,null);
}
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("DefaultMessageStore is not writable,so putMessages is forbidden " + this.runningFlags.getFlagBits());
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE,null);
} else {
this.printTimes.set(0);
}
if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
log.warn("PutMessages topic length too long " + messageExtBatch.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,null);
}
if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) {
log.warn("PutMessages body length too long " + messageExtBatch.getBody().length);
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,null);
}
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY,null);
}
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={},bodyLength={}",elapsedTime,messageExtBatch.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
//......
}
複製程式碼
- DefaultMessageStore的putMessages方法會判斷messageExtBatch.getBody().length是否大於messageStoreConfig.getMaxMessageSize(),大於的話則返回PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,null)
小結
rocketmq的client端及broker端均有對訊息體大小是否超出maxMessageSize進行校驗;client端的DefaultMQProducer定義了maxMessageSize,預設是4M大小;send方法及batch方法都會呼叫Validators.checkMessage(message,this)校驗訊息;服務端conf/broker.conf可以指定maxMessageSize大小;如果需要修改maxMessageSize大小需要跟服務端配合一起修改