聊聊rocketmq的suggestPullingFromSlave
阿新 • • 發佈:2020-06-24
序
本文主要研究一下rocketmq的suggestPullingFromSlave
suggestPullingFromSlave
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
public class GetMessageResult {
private final List<SelectMappedBufferResult> messageMapedList =
new ArrayList<SelectMappedBufferResult>(100);
private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);
private GetMessageStatus status;
private long nextBeginOffset;
private long minOffset;
private long maxOffset;
private int bufferTotalSize = 0;
private boolean suggestPullingFromSlave = false ;
private int msgCount4Commercial = 0;
//......
public boolean isSuggestPullingFromSlave() {
return suggestPullingFromSlave;
}
public void setSuggestPullingFromSlave(boolean suggestPullingFromSlave) {
this.suggestPullingFromSlave = suggestPullingFromSlave;
}
//......
}
複製程式碼
- GetMessageResult定義了suggestPullingFromSlave屬性,預設為false
getMessage
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
public class DefaultMessageStore implements MessageStore {
//......
public GetMessageResult getMessage(final String group,final String topic,final int queueId,final long offset,final int maxMsgNums,final MessageFilter messageFilter) {
if (this.shutdown) {
log.warn("message store has shutdown,so getMessage is forbidden");
return null;
}
if (!this.runningFlags.isReadable()) {
log.warn("message store is not readable,so getMessage is forbidden " + this.runningFlags.getFlagBits());
return null;
}
long beginTime = this.getSystemClock().now();
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;
long minOffset = 0;
long maxOffset = 0;
GetMessageResult getResult = new GetMessageResult();
final long maxOffsetPy = this.commitLog.getMaxOffset();
ConsumeQueue consumeQueue = findConsumeQueue(topic,queueId);
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
if (maxOffset == 0) {
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset,0);
} else if (offset < minOffset) {
status = GetMessageStatus.OFFSET_TOO_SMALL;
nextBeginOffset = nextOffsetCorrection(offset,minOffset);
} else if (offset == maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
nextBeginOffset = nextOffsetCorrection(offset,offset);
} else if (offset > maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
if (0 == minOffset) {
nextBeginOffset = nextOffsetCorrection(offset,minOffset);
} else {
nextBeginOffset = nextOffsetCorrection(offset,maxOffset);
}
} else {
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
final int maxFilterMessageCount = Math.max(16000,maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
maxPhyOffsetPulling = offsetPy;
//......
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
//......
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
bufferConsumeQueue.release();
}
} else {
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(offset,consumeQueue.rollNextFile(offset));
log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
+ maxOffset + ",but access logic queue failed.");
}
}
} else {
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset,0);
}
if (GetMessageStatus.FOUND == status) {
this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
} else {
this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
}
long elapsedTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
}
//......
}
複製程式碼
- DefaultMessageStore的getMessage會計算diff(
maxOffsetPy - maxPhyOffsetPulling
),然後同memory(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)
)對比,大於則設定getResult的suggestPullingFromSlave為true
processRequest
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
public class PullMessageProcessor implements NettyRequestProcessor {
//......
private RemotingCommand processRequest(final Channel channel,RemotingCommand request,boolean brokerAllowSuspend)
throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
//......
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(),requestHeader.getTopic(),requestHeader.getQueueId(),requestHeader.getQueueOffset(),requestHeader.getMaxMsgNums(),messageFilter);
if (getMessageResult != null) {
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
break;
case SLAVE:
if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
break;
}
if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
// consume too slow,redirect to another machine
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
}
// consume ok
else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
}
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
//......
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store getMessage return null");
}
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),requestHeader.getConsumerGroup(),requestHeader.getCommitOffset());
}
return response;
}
//......
}
複製程式碼
- processRequest方法會通過brokerController.getMessageStore().getMessage方法獲取getMessageResult,之後判斷getMessageResult.isSuggestPullingFromSlave()是否為true,是的話則執行responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()),否則執行responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID)
小結
DefaultMessageStore的getMessage會計算diff(maxOffsetPy - maxPhyOffsetPulling
),然後同memory(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)
)對比,大於則設定getResult的suggestPullingFromSlave為true