rocketmq-訊息儲存結構
參考:
https://blog.csdn.net/GAMEloft9/article/details/100562191
https://blog.csdn.net/meilong_whpu/article/details/76919267
https://www.cnblogs.com/fanguangdexiaoyuer/p/10496112.html
RocketMQ訊息儲存結構簡介--CommitLog
RocketMQ訊息儲存是整個系統的核心,直接決定著吞吐效能和高可用性。RocketMQ儲存訊息並沒有藉助oracle、mysql等關係型資料庫,而是直接操作檔案。藉助java NIO的力量,使得I/O效能十分高。當訊息來的時候,順序寫入CommitLog。為了Consumer消費訊息的時候,能夠方便的根據topic查詢訊息,在CommitLog的基礎上衍生出了CosumerQueue檔案,存放了某topic的訊息在CommitLog中的偏移位置。此外為了支援根據訊息key查詢訊息,還構建了index檔案。這三個檔案(邏輯上是三個),就是RocketMQ的主要儲存內容,大致結構如下圖所示(圖片來自書籍《RocketMQ技術內幕》):
限於篇幅,這篇文章僅介紹CommitLog相關的一些東西。
CommitLog類、MappedFileQueue、MappedFile
CommitLog名字取得非常好,除了訊息本身,它記錄了訊息的方方面面的資訊,通過一條CommitLog我們可以還原出很多東西。例如訊息是何時、由哪個producer傳送的,被髮送到了哪個訊息佇列,屬於哪個topic,有哪些屬性等等。RokcetMQ儲存的訊息其實儲存的就是這個CommitLog記錄。後面的敘述中,如果沒有特別說明,我們可以將CommitLog記錄等同於訊息,而CommitLog特指儲存訊息的檔案。
CommitLog類如下所示:
public class CommitLog {
// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
// End of file empty MAGIC CODE cbd43194
private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
private final MappedFileQueue mappedFileQueue; // 存放的檔案佇列
// 省略程式碼
}
CommitLog類屬性很多,但是最重要的是mappedFileQueue屬性。之前我們一直說訊息最終儲存在CommitLog裡,實際上CommitLog是一個邏輯上的概念。真正的檔案是一個個MappedFile,然後組成了mappedFileQueue。一個MappedFile最多能存放1G的CommitLog,這個大小在MessageStoreConfi類裡面定義了的:
當一個MappedFile寫滿了之後,就會建立第二MappedFile,然後繼續存CommitLog,如下所示:
MappedFileQueue是存放MappedFile的佇列結構,如下所示:
public class MappedFileQueue {
private static final int DELETE_FILES_BATCH_MAX = 10;
private final String storePath; // 檔案儲存路徑,例如/home/gameloft9/store/commitlog
private final int mappedFileSize; // 單個MappedFile大小,預設1G
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>(); // mappedFile列表
private final AllocateMappedFileService allocateMappedFileService; // 分配mappedFile的service
// 省略程式碼
}
MappedFileQueue存放了一個個MappedFile,然後還記錄了一些額外的資訊,例如儲存檔案路徑、單個MappedFile大小等等。
下面我們再來看MappedFile的結構:
public class MappedFile extends ReferenceResource {
protected int fileSize;
protected FileChannel fileChannel; // 讀寫通道
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
protected ByteBuffer writeBuffer = null; // 緩衝區,使用的是直接堆外記憶體
private String fileName;
private File file;
private MappedByteBuffer mappedByteBuffer; // 記憶體對映
}
如果之前沒有接觸過FileChannel、ByteBuffer、MappedByteBuffer,可以先補習一下這篇文章:ByteBuffer介紹
writeBuffer使用的是堆外記憶體,mappedByteBuffer是直接將檔案對映到記憶體中,兩者的使用是互斥的。如果啟用了臨時緩衝池(預設不啟用),那麼就會使用writeBuffer寫commitlog,否則就是mappedBtyeBuffer寫commitlog。
CommitLog、MappedFileQueue和MappedFile的大致關係如下所示:
CommitLog記錄
一條CommitLog記錄包括哪些內容呢?CommitLog要實現的功能,決定了它需要儲存哪些內容。首先要實現訊息的儲存,肯定需要把訊息存下來。其次,為了方便建立ConsumerQueue,需要記錄topic、queueId等資訊。為了能跟蹤訊息,需要記錄訊息傳送方地址、傳送時間等。。。
完整的CommitLog記錄如下所示:
為什麼會先跳過訊息儲存流程先講儲存內容結構呢?因為流程實在是太複雜、內容太多了,很容易暈頭轉向。如果先對儲存的內容有一個大致的概念,後面再理解訊息儲存過程會好很多。骨頭難啃,總要挑一處簡單的下嘴嘛。下面從功能上解釋為什麼要存這些欄位。不瞭解整個流程,理解這些欄位也會有些困難,我會盡量保證通俗易懂。
TotalSize
TotalSize很好理解,就是整個CommitLog記錄的大小,包括上面列出來的所有欄位大小。TotalSize佔用4個位元組,在往byteBuffer寫CommitLog的時候,首先就會寫入這個CommitLog大小,如下所示:
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
MagicCode
MagicCode是一個特殊的欄位,它可以標誌ByteBuffer中的某個CommitLog是一個正常的CommitLog,還是因為ByteBuffer沒有多餘的空間存放該CommitLog,導致該CommitLog是一個空的CommitLog。
MagicCode有兩個值,如下所示:
// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
// End of file empty MAGIC CODE cbd43194
private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
MESSAGE_MAGIC_CODE表明該CommitLog記錄是一條正常的記錄,BLANK_MAGIC_CODE表明該CommitLog記錄是一個空的CommitLog記錄。
如果儲存CommitLog發現空間不夠,會馬上開闢第二個檔案重新儲存CommitLog記錄,但是之前的空的CommitLog也一樣會儲存下來。在Broker正常退出或者異常退出,重啟之後需要恢復Broker的時候,就會根據這個MagicCode判斷該條CommitLog是否是正常的。
BodyCRC
學過計算機網路的人一定知道CRC,CRC即迴圈冗餘校驗碼,是資料通訊領域中最常用的一種查錯校驗碼,通過CRC就可以知道資料的正確性和完整性。RocketMQ通過CRC來校驗訊息部分,如下所示:
if (checkCRC) {
int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
if (crc != bodyCRC) {
log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
return new DispatchRequest(-1, false/* success */);
}
}
queueId
queueId很熟悉的,就是訊息發往哪個佇列。queueId在producer傳送訊息時會選擇出來,這個在文章RocketMQ傳送訊息時如何選擇佇列中已經很詳細的講過了,這裡就不再贅述queueId是如何產生的了。
Topic下會有一堆訊息佇列(ConsumerQueue),RocketMQ在儲存完訊息後,會隨後構建ConsumerQueue,裡面存放著Topic下訊息的在CommitLog檔案中的偏移量,方便根據Topic查詢消費訊息。ConsumerQueue的構建、訊息的消費都是重點內容,會在單獨的文章中進行介紹。
Flag
暫時不知道有什麼用,預設值是0。
QueueOffset
我們之前講過,為了方便Consumer能根據Topic快速的查詢訊息,在CommitLog的基礎上構建了ConsumerQueue,裡面存放了某個Topic下面的所有訊息在CommitLog中的位置。
同樣的,這裡的QueueOffset存放了訊息記錄應該在ConsumerQueue中的位置,這樣構建ConsumerQueue的時候,就知道該條記錄在ConsummerQueue的位置順序,在消費訊息的時候很有用處。QueueOffset一般是是累加1的,如下所示:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
這個與ConsumerQueue的儲存結構有關,後面介紹ConsumerQueue儲存結構的時候會涉及到。
PhysicalOffset
這個很簡單了,就是訊息在CommitLog中的物理位置。需要注意的是,我們CommitLog對應著磁碟上的多個檔案,這裡的偏移量不是從某個檔案開始算的,而是從第一個檔案偏移開始算起的。
SysFlag
SysFlag是RocketMQ內部使用的標記位,通過位運算進行標記。例如是否對訊息進行了壓縮、是否屬於事務訊息。SysFlag初始值為0,可與下面的標記進行位運算。
public final static int COMPRESSED_FLAG = 0x1;
public final static int MULTI_TAGS_FLAG = 0x1 << 1; // 2
public final static int TRANSACTION_NOT_TYPE = 0; // 不參與位運算,用作結果比較,表示無事務
public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2; // 4
public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2; // 8
public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2; // 12
例如對訊息進行了壓縮,那麼SysFlag = 0 | 0x1。又例如flag & TRANSACTION_ROLLBACK_TYPE,可以判斷訊息是否是事務訊息,如果等於0說明不是事務訊息。
BornTimestamp
Producer傳送訊息的時間,如下所示:
requestHeader.setBornTimestamp(System.currentTimeMillis());
BornHost
Producer傳送訊息使用的套接字地址
msgInner.setBornHost(ctx.channel().remoteAddress());
CommitLog存的時候是讀取4個位元組的rao ip + 4個位元組的埠號:
byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
byteBuffer.putInt(inetSocketAddress.getPort());
StoreTimestamp
訊息在broker上儲存時間。
StoreHostAddress
Broker的套接字地址,儲存方式同BornHost。
ReconsumeTimes
重複消費次數,初始為0。我們消費訊息的時候,如果發生異常,可以選擇晚一點重新消費,如下所示:
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try{
for(MessageExt msg : msgs){
if(msg.getTopic().equals("test")){
log.info("收到test型別訊息:" + new String(msg.getBody()));
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消費成功
}catch(Exception e){
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 稍後重新消費
}
}
});
Broker重試的時候,這個ReconsumeTimes就會+1,預設最大重試次數是16次。
PreparedTransactionOffset
事務訊息相關的一個屬性。RocketMQ事務訊息基於兩階段提交,這裡僅僅瞭解一點就夠了,涉及到事務訊息的時候會再提到。
Body
訊息體,沒什麼好說的。需要注意的是,Body前面其實會有4位元組(int)的Body長度,這裡沒有畫出來。
Topic
主題,沒什麼好說的。需要注意的是,Topic前面其實會有1位元組(byte)的Topic長度,這裡沒有畫出來。
Properties
訊息屬性。需要注意的是,Properties前面其實會有2位元組(short)的Properties長度,這裡沒有畫出來。
Properties既存放了RocketMQ內部用到的一些屬性,也存放了使用者的一些屬性。例如傳送訊息的TAG就存放在Properties裡面:
Message msg = new Message("test",// topic
"TagB",// tag就會存放在Properties裡面
("我發了一條訊息").getBytes());// body
Properties中的一些常用key都定義在了MessageConstant裡面,如下所示:
public class MessageConst {
public static final String PROPERTY_KEYS = "KEYS";
public static final String PROPERTY_TAGS = "TAGS";
public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";
public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";
public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
public static final String PROPERTY_PRODUCER_GROUP = "PGROUP";
public static final String PROPERTY_MIN_OFFSET = "MIN_OFFSET";
public static final String PROPERTY_MAX_OFFSET = "MAX_OFFSET";
public static final String PROPERTY_BUYER_ID = "BUYER_ID";
public static final String PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
public static final String PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG";
public static final String PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG";
public static final String PROPERTY_MQ2_FLAG = "MQ2_FLAG";
public static final String PROPERTY_RECONSUME_TIME = "RECONSUME_TIME";
public static final String PROPERTY_MSG_REGION = "MSG_REGION";
public static final String PROPERTY_TRACE_SWITCH = "TRACE_ON";
public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
public static final String PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES";
public static final String PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME";
}
小結
通過了解CommitLog記錄的一些屬性,可以幫助我們更好的瞭解RocketMQ訊息儲存、消費的一些細節。