rocketmq學習筆記 五 原始碼之rocketmq-store
阿新 • • 發佈:2019-02-08
因為broker東西比較多,所以放到最後。今天來學習下 rocketmq-store
核心流程
問題
1.看看訊息如何做的持久化
2.看看如何做的主從同步
config 儲存的配置資訊
public enum BrokerRole {
ASYNC_MASTER,
SYNC_MASTER,
SLAVE;
}
public enum FlushDiskType {
SYNC_FLUSH,
ASYNC_FLUSH
}
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.rocketmq.store.config; import com.alibaba.rocketmq.common.annotation.ImportantField; import com.alibaba.rocketmq.store.ConsumeQueue; import java.io.File; /** * @author vongosling * @author shijia.wxr */ public class MessageStoreConfig { //The root directory in which the log data is kept @ImportantField private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; //The directory in which the commitlog is kept @ImportantField private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog"; // CommitLog file size,default is 1G private int mapedFileSizeCommitLog = 1024 * 1024 * 1024; // ConsumeQueue file size, default is 30W private int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQStoreUnitSize; // CommitLog flush interval @ImportantField private int flushIntervalCommitLog = 1000; // Whether schedule flush,default is real-time @ImportantField private boolean flushCommitLogTimed = false; // ConsumeQueue flush interval private int flushIntervalConsumeQueue = 1000; // Resource reclaim interval private int cleanResourceInterval = 10000; // CommitLog removal interval private int deleteCommitLogFilesInterval = 100; // ConsumeQueue removal interval private int deleteConsumeQueueFilesInterval = 100; private int destroyMapedFileIntervalForcibly = 1000 * 120; private int redeleteHangedFileInterval = 1000 * 120; // When to delete,default is at 4 am @ImportantField private String deleteWhen = "04"; private int diskMaxUsedSpaceRatio = 75; // The number of hours to keep a log file before deleting it (in hours) @ImportantField private int fileReservedTime = 72; // Flow control for ConsumeQueue private int putMsgIndexHightWater = 600000; // The maximum size of a single log file,default is 512K private int maxMessageSize = 1024 * 1024 * 4; // Whether check the CRC32 of the records consumed. // This ensures no on-the-wire or on-disk corruption to the messages occurred. // This check adds some overhead, so it may be disabled in cases seeking extreme performance. private boolean checkCRCOnRecover = true; // How many pages are to be flushed when flush CommitLog private int flushCommitLogLeastPages = 4; // Flush page size when the disk in warming state private int flushLeastPagesWhenWarmMapedFile = 1024 / 4 * 16; // How many pages are to be flushed when flush ConsumeQueue private int flushConsumeQueueLeastPages = 2; private int flushCommitLogThoroughInterval = 1000 * 10; private int flushConsumeQueueThoroughInterval = 1000 * 60; @ImportantField private int maxTransferBytesOnMessageInMemory = 1024 * 256; @ImportantField private int maxTransferCountOnMessageInMemory = 32; @ImportantField private int maxTransferBytesOnMessageInDisk = 1024 * 64; @ImportantField private int maxTransferCountOnMessageInDisk = 8; @ImportantField private int accessMessageInMemoryMaxRatio = 40; @ImportantField private boolean messageIndexEnable = true; private int maxHashSlotNum = 5000000; private int maxIndexNum = 5000000 * 4; private int maxMsgsNumBatch = 64; @ImportantField private boolean messageIndexSafe = false; private int haListenPort = 10912; private int haSendHeartbeatInterval = 1000 * 5; private int haHousekeepingInterval = 1000 * 20; private int haTransferBatchSize = 1024 * 32; @ImportantField private String haMasterAddress = null; private int haSlaveFallbehindMax = 1024 * 1024 * 256; @ImportantField private BrokerRole brokerRole = BrokerRole.ASYNC_MASTER; @ImportantField private FlushDiskType flushDiskType = FlushDiskType.ASYNC_FLUSH; private int syncFlushTimeout = 1000 * 5; private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; private long flushDelayOffsetInterval = 1000 * 10; @ImportantField private boolean cleanFileForciblyEnable = true; private boolean warmMapedFileEnable = false; private boolean offsetCheckInSlave = false; private boolean debugLockEnable = false; private boolean duplicationEnable = false; private boolean diskFallRecorded = true; private long osPageCacheBusyTimeOutMills = 1000; private int defaultQueryMaxNum = 32;
DefaultMessageStore 看下其啟動任務,包含了哪些東西
public void start() throws Exception { this.flushConsumeQueueService.start(); this.commitLog.start(); this.storeStatsService.start(); if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) { this.scheduleMessageService.start(); } if (this.getMessageStoreConfig().isDuplicationEnable()) { this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); } else { this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset()); } this.reputMessageService.start(); this.haService.start(); this.createTempFile(); this.addScheduleTask(); this.shutdown = false; }
FlushConsumeQueueService
private void doFlush(int retryTimes) { int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages(); if (retryTimes == RetryTimesOver) { flushConsumeQueueLeastPages = 0; } long logicsMsgTimestamp = 0; int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval(); long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushConsumeQueueLeastPages = 0; logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp(); } ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) { for (ConsumeQueue cq : maps.values()) { boolean result = false; for (int i = 0; i < retryTimes && !result; i++) { result = cq.commit(flushConsumeQueueLeastPages); } } } if (0 == flushConsumeQueueLeastPages) { if (logicsMsgTimestamp > 0) { DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp); } DefaultMessageStore.this.getStoreCheckpoint().flush(); } }
CommitLog
StoreStatsService
統計輸出message store的TPS public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
try {
this.waitForRunning(FrequencyOfSampling);
this.sampling();
this.printTps();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
ScheduleMessageService
public void start() {
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
ScheduleMessageService.this.persist();
} catch (Exception e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
ReputMessageService
private void doReput() {
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
SelectMapedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
DefaultMessageStore.this.doDispatch(dispatchRequest);
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode());
}
// FIXED BUG By shijia
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
}
}
else if (size == 0) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
}
else {
doNext = false;
if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += (result.getSize() - readSize);
}
}
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
HAService
public void start() {
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
this.groupTransferService.start();
this.haClient.start();
}