Rocketmq消息持久化
阿新 • • 發佈:2017-06-27
res producer .net class mem mapped 並不是 net strac
本文編寫,參考:https://my.oschina.net/bieber/blog/725646
producer Send()的Message最終將由broker處理,處理類為:SendMessageProcessor ,處理方法:processRequet.
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList;
public SendMessageProcessor(final BrokerController brokerController) {
super(brokerController);
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {}
上述方法,並不是直接處理消息,而是交由MessageStore處理,相關代碼如下:
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
//......
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
然而MessageStore也不直接持久化消息,轉交給 CommitLog
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
從MappedFileQueue中取出最新的一條:
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//寫消息
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
//持久化到磁盤
handleDiskFlush(result, putMessageResult, messageExtBatch);
handleHA(result, putMessageResult, messageExtBatch);
cousumer 從broker讀消息。
Rocketmq消息持久化