RocketMQ 生產者和訊息儲存
RocketMQ 生產者和訊息儲存
1. 前言
本篇主要從原始碼分析訊息的傳送及儲存。rocketmq傳送訊息分為三種實現方式:可靠同步傳送、可靠非同步傳送、單向傳送。目前的MQ中介軟體從儲存模型來看,分為需要持久化和不需要持久化兩種。本篇文章會分析rocketmq的訊息儲存機制。
2. RocketMQ 訊息
先看看rocketmq 訊息封裝類org.apache.rocketmq.common.message.Message
基本屬性:主題topic、訊息flag、訊息體、擴充套件屬性
隱藏屬性:
- tag:訊息TAG,用於訊息過濾
- keys:訊息索引鍵
- waitStoreMsgOK:訊息傳送時是否等訊息儲存完成後再返回
- delayTimeLevel:訊息延遲級別,用於定時訊息或訊息重試
擴充套件屬性都存在Message的properties中。
3. 生產者啟動流程
我們從DefaultMQProducerImpl 的start 方法追蹤。
第一步:檢查productGroup 是否符合要求,並改變生產者的instanceName為程式ID
//DefaultMQProducerImpl::start
public void start() throws MQClientException {
this.start(true);//預設為true
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this .defaultMQProducer.changeInstanceNameToPID();
}
//第一步
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer,rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(),this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before,specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(),new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}",this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK,maybe started once,"
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
複製程式碼
第二步:建立MQClientInstance例項。
第三步:向MQClientInstance註冊,將當前生產者加入MQClientInstance管理中,方便後續呼叫網路請求、進行心跳檢測等。
第四步:啟動MQClientInstance,如果MQClientInstance已經啟動,如果已經啟動則本次不啟動。
4. 訊息傳送基本流程
訊息傳送流程主要是:驗證訊息、查詢路由、訊息傳送(包含異常處理機制)。
訊息驗證,主要是進行訊息的長度驗證,我們主要講解一下查詢路由和訊息傳送。
4.1 查詢路由
訊息傳送之前,首先需要獲取主題的路由資訊
//DefaultMQProducerImpl::tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic,new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic,true,this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
複製程式碼
如果生產者快取了該 topic 路由資訊,包含了訊息佇列,則直接返回該路由資訊,如果沒有快取或沒有包含訊息佇列,則向NameServer查詢該topic的路由資訊。如果是第一次傳送訊息,未找到會嘗試用預設topic去查詢。沒找到則報錯。
4.2 選擇訊息
根據路由資訊選擇訊息佇列,返回的訊息佇列按照broker、序號排序。首先訊息傳送採取重試機制,迴圈執行,選擇訊息佇列、傳送訊息,傳送成功則返回,傳送失敗則重試。訊息選擇有兩種方式。
- sendLatencyFaultEnable=false,預設機制
- sendLatencyFaultEnable=true,啟用Broker故障延遲機制
//MQFaultStrategy::selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo,final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue",e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
複製程式碼
4.3 訊息傳送
訊息傳送API核心入口:DefaultMQProducerImpl::sendKernelImpl
private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException,RemotingException,MQBrokerException,InterruptedException {
//省略
}
複製程式碼
引數詳解:
- Message msg:待傳送訊息
- MessageQueue mq:訊息將傳送到訊息佇列上
- CommunicationMode communicationMode:訊息傳送模式,SYNC、ASYNC、ONEWAY
- SendCallback sendCallback:非同步訊息回撥函式
- TopicPublishInfo topicPublishInfo:主題路由訊息
- long timeout:訊息傳送超時時間
傳送步驟:
- 根據MessageQueue獲取Broker的網路地址
- 為訊息分配全域性唯一ID
- 如果註冊了訊息傳送鉤子函式,則執行訊息傳送之前的增強邏輯
- 構建訊息傳送請求包
- 根據訊息傳送方式,同步、非同步、單向方式進行網路傳輸
- 如果註冊了訊息傳送鉤子函式,執行after邏輯
4.3.1 同步傳送
MQ客戶端傳送訊息的入口是MQClientAPIImpl::sendMessage
同步傳送步驟:
- 檢查訊息傳送是否合理
//AbstractSendMessageProcessor::msgCheck
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,final SendMessageRequestHeader requestHeader,final RemotingCommand response) {
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending message is forbidden");
return response;
}
if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
return response;
}
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicSysFlag = TopicSysFlag.buildSysFlag(false,true);
} else {
topicSysFlag = TopicSysFlag.buildSysFlag(true,false);
}
}
log.warn("the topic {} not exist,producer: {}",requestHeader.getTopic(),ctx.channel().remoteAddress());
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),requestHeader.getDefaultTopic(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.getDefaultTopicQueueNums(),topicSysFlag);
if (null == topicConfig) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
requestHeader.getTopic(),1,PermName.PERM_WRITE | PermName.PERM_READ,topicSysFlag);
}
}
if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist,apply first please!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
int queueIdInt = requestHeader.getQueueId();
int idValid = Math.max(topicConfig.getWriteQueueNums(),topicConfig.getReadQueueNums());
if (queueIdInt >= idValid) {
String errorInfo = String.format("request queueId[%d] is illegal,%s Producer: %s",queueIdInt,topicConfig.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
return response;
}
複製程式碼
- 如果訊息重試次數超過允許的最大重試次數,訊息將進入DLD延遲佇列
- 呼叫DefaultMessageStore::putMessage進行訊息儲存
4.3.2 非同步傳送
非同步傳送,無須阻塞等待訊息伺服器返回訊息傳送結果,只需要提供一個回撥函式供訊息傳送客戶端在收到響應結果回撥。非同步方式相比同步傳送,傳送端的傳送效能提高了不少。
4.3.3 單向傳送
單向傳送,無須等待結果,也無須提供回撥函式,訊息傳送端壓根不關心訊息是否傳送成功,原理和非同步傳送相同,只是訊息傳送端收到結果後什麼也不做。
4.3.4 批量傳送
批量訊息傳送是將同一主題的多條資訊一起打包傳送給訊息服務端,減少網路呼叫次數,提高網路傳輸速率。
單條訊息傳送時,訊息體的內容將儲存在body中,批量訊息傳送,需要將多條訊息的內容儲存在body中,RocketMQ 對多條訊息內容進行固定格式進行儲存。
批量傳送:
//DefaultMQProducer::send
public SendResult send(
Collection<Message> msgs) throws MQClientException,InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs));
}
複製程式碼
**傳送流程:**首先在訊息傳送端,呼叫batch方法,將一批訊息封裝成MessageBatch物件,MessageBatch內部持有Listmessages,這樣批量傳送就和單條傳送流程完全一樣了。
循跡一下:
//DefaultMQProducer::send
public SendResult send(
Collection<Message> msgs) throws MQClientException,InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs));
}
//DefaultMQProducer::batch
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
MessageBatch msgBatch;
try {
msgBatch = MessageBatch.generateFromList(msgs);
for (Message message : msgBatch) {
Validators.checkMessage(message,this);
MessageClientIDSetter.setUniqID(message);
message.setTopic(withNamespace(message.getTopic()));
}
msgBatch.setBody(msgBatch.encode());
} catch (Exception e) {
throw new MQClientException("Failed to initiate the MessageBatch",e);
}
msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
return msgBatch;
}
//DefaultMQProducerImpl::send
public SendResult send(
Message msg) throws MQClientException,InterruptedException {
return send(msg,this.defaultMQProducer.getSendMsgTimeout());
}
複製程式碼
5. 訊息儲存
業務系統大多需要MQ有持久儲存的能力,能大大增加系統的高可用性。
我們先看看rocketmq 資料流向:
- CommitLog:訊息儲存檔案,所有訊息主題的訊息都儲存在CommitLog檔案中
- ConsumeQueue:訊息消費佇列,訊息到達CommitLog檔案後,將非同步轉發到訊息消費佇列,供訊息消費者消費
- IndexFile:訊息索引檔案,主要儲存訊息Key與Offset的對應關係
- 事務狀態服務:儲存每條訊息的事務狀態
- 定時訊息服務:每一個延遲級別對應一個訊息消費佇列,儲存延遲佇列的訊息拉去進度
RocketMQ的儲存架構:
訊息儲存實現類: org.apache.rocketmq.store.DefaultMessageStore
介紹核心屬性:
- MessageStoreConfig messageStoreConfig:訊息儲存配置屬性
- CommitLog commitLog:CommitLog 檔案儲存的實現類
- ConcurrentMap<String/* topic /,ConcurrentMap<Integer/ queueId */,ConsumeQueue>> consumeQueueTable :訊息佇列儲存快取表,按訊息主題分組
- FlushConsumeQueueService flushConsumeQueueService:訊息佇列檔案ConsumeQueue刷盤執行緒
- CleanCommitLogService cleanCommitLogService:清除CommitLog問價服務
- CleanConsumeQueueService cleanConsumeQueueService:清除ConsumeQueue檔案服務
- IndexService indexService:索引檔案實現類
- AllocateMappedFileService allocateMappedFileService:MappedFile分配服務
- ReputMessageService reputMessageService:CommitLog訊息分發,根據CommitLog檔案構建ConsumeQueue、IndexFile檔案
- HAService haService:儲存HA機制
- TransientStorePool transientStorePool:訊息堆記憶體快取
- MessageArrivingListener messageArrivingListener:訊息拉取長輪詢模式訊息達到監聽器
- BrokerConfig brokerConfig:Broker配置屬性
- StoreCheckpoint storeCheckpoint:檔案刷盤檢測點
- LinkedList dispatcherList:CommitLog檔案轉發請求
5.1 訊息傳送儲存流程
訊息儲存入口:org.apache.rocketmq.store.DefaultMessageStore::putMessage
- 如果當前Broker停止工作或Broker為SLAVE 角色或當前Rocket不支援寫入則拒絕訊息寫入,如果訊息長度超過256字元、訊息屬性長度超過65536個字元將拒絕該訊息寫入
- 驗證訊息延遲級別
- 獲取當前可以寫入的CommitLog檔案
- 寫入CommitLog之前,先申請putMessageLock,也就是將訊息儲存到CommitLog檔案中是序列
- 設計訊息的儲存時間
- 將訊息追加到MappedFile中
- 建立全域性唯一訊息ID
- 獲取該訊息在訊息佇列的偏移量
- 根據訊息體的長度、主題的長度、屬性的長度結合訊息儲存格式計算訊息的總長度
- 如果訊息長度 +END_FILE_MIN_BLANK_LENGTH 大於CommitLog檔案
- 將訊息記憶體儲存到ByteBuffer中,然後建立AppendMessageResult。
- 更新訊息佇列邏輯偏移量
- 處理完訊息追加邏輯後將釋放putMessageLock鎖
- DefaultAppendMessageCallback::doAppend 只是將訊息追加到記憶體中,需要根據同步刷盤還是非同步刷盤方式,將記憶體中的資料持久化到磁碟
簡化成如下時序圖:
5.2 記憶體對映流程
RocketMQ通過使用記憶體對映檔案來提高IO訪問效能,無論是CommitLog、ConsumeQueue還是IndexFile,單個檔案都被設計為固定長度,如果一個檔案寫滿後再建立一個新檔案,檔名就為第一條訊息對應的全域性物力偏移量。
步驟:
- 記憶體對映檔案MappedFile通過AllocateMappedFileService建立
- MappedFile的建立是典型的生產者-消費者模型
- MappedFileQueue呼叫getLastMappedFile獲取MappedFile時,將請求放入佇列中
- AllocateMappedFileService執行緒持續監聽佇列,佇列有請求時,創建出MappedFile物件
- 最後將MappedFile物件預熱,底層呼叫force方法和mlock方法
5.3 刷盤流程
訊息在呼叫MapedFile的appendMessage後,也只是將訊息裝載到了ByteBuffer中,也就是記憶體中,還沒有落盤。落盤需要將記憶體flush到磁碟上,針對commitLog,rocketMQ提供了兩種落盤方式。
- producer傳送給broker的訊息儲存在MappedFile中,然後通過刷盤機制同步到磁碟中
- 刷盤分為同步刷盤和非同步刷盤
- 非同步刷盤後臺執行緒按一定時間間隔執行
- 同步刷盤也是生產者-消費者模型。broker儲存訊息到MappedFile後,建立GroupCommitRequest請求放入列表,並阻塞等待。後臺執行緒從列表中獲取請求並重新整理磁碟,成功刷盤後通知等待執行緒。
同步刷盤(CommitLog.java):
//封裝的一次刷盤請求
public static class GroupCommitRequest {
//這次請求要刷到的offSet位置,比如已經刷到2,
private final long nextOffset;
//控制flush的拴
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private volatile boolean flushOK = false;
public GroupCommitRequest(long nextOffset) {
this.nextOffset = nextOffset;
}
public long getNextOffset() {
return nextOffset;
}
//刷完了喚醒
public void wakeupCustomer(final boolean flushOK) {
this.flushOK = flushOK;
this.countDownLatch.countDown();
}
public boolean waitForFlush(long timeout) {
try {
this.countDownLatch.await(timeout,TimeUnit.MILLISECONDS);
return this.flushOK;
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
}
}
/**
* GroupCommit Service
* 批量刷盤服務
*/
class GroupCommitService extends FlushCommitLogService {
//用來接收訊息的佇列,提供寫訊息
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
//用來讀訊息的佇列,將訊息從記憶體讀到硬碟
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
//新增一個刷盤的request
public void putRequest(final GroupCommitRequest request) {
synchronized (this) {
//新增到寫訊息的list中
this.requestsWrite.add(request);
//喚醒其他執行緒
if (!this.hasNotified) {
this.hasNotified = true;
this.notify();
}
}
}
//交換讀寫佇列,避免上鎖
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
private void doCommit() {
//讀佇列不為空
if (!this.requestsRead.isEmpty()) {
//遍歷
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file,so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; (i < 2) && !flushOK; i++) {
//
flushOK = (CommitLog.this.mapedFileQueue.getCommittedWhere() >= req.getNextOffset());
//如果沒刷完 即flushOK為false則繼續刷
if (!flushOK) {
CommitLog.this.mapedFileQueue.commit(0);
}
}
//刷完了喚醒
req.wakeupCustomer(flushOK);
}
long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
//清空讀list
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush,it
// will come to this process
CommitLog.this.mapedFileQueue.commit(0);
}
}
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
try {
this.waitForRunning(0);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ",e);
}
}
// Under normal circumstances shutdown,wait for the arrival of the
// request,and then flush
//正常關閉時要把沒刷完的刷完
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception,",e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
}
複製程式碼
非同步刷盤(CommitLog.java):
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
//不停輪詢
while (!this.isStoped()) {
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
//拿到要刷盤的頁數
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
boolean printFlushProgress = false;
// Print flush progress
long currentTimeMillis = System.currentTimeMillis();
//控制刷盤間隔,如果當前的時間還沒到刷盤的間隔時間則不刷
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
printFlushProgress = ((printTimes++ % 10) == 0);
}
try {
//是否需要刷盤休眠
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
if (printFlushProgress) {
this.printFlushProgress();
}
//commit開始刷盤
CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ",e);
this.printFlushProgress();
}
}
// Normal shutdown,to ensure that all the flush before exit
boolean result = false;
for (int i = 0; i < RetryTimesOver && !result; i++) {
result = CommitLog.this.mapedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown,retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
this.printFlushProgress();
CommitLog.log.info(this.getServiceName() + " service end");
}
複製程式碼
6. 小結&參考資料
小結
訊息傳送流程圖:
訊息儲存流程圖: