1. 程式人生 > 實用技巧 >rocketmq-訊息儲存結構

rocketmq-訊息儲存結構

參考:

https://blog.csdn.net/GAMEloft9/article/details/100562191

https://blog.csdn.net/meilong_whpu/article/details/76919267

https://www.cnblogs.com/fanguangdexiaoyuer/p/10496112.html

RocketMQ訊息儲存結構簡介--CommitLog

RocketMQ訊息儲存是整個系統的核心,直接決定著吞吐效能和高可用性。RocketMQ儲存訊息並沒有藉助oracle、mysql等關係型資料庫,而是直接操作檔案。藉助java NIO的力量,使得I/O效能十分高。當訊息來的時候,順序寫入CommitLog。為了Consumer消費訊息的時候,能夠方便的根據topic查詢訊息,在CommitLog的基礎上衍生出了CosumerQueue檔案,存放了某topic的訊息在CommitLog中的偏移位置。此外為了支援根據訊息key查詢訊息,還構建了index檔案。這三個檔案(邏輯上是三個),就是RocketMQ的主要儲存內容,大致結構如下圖所示(圖片來自書籍《RocketMQ技術內幕》):

限於篇幅,這篇文章僅介紹CommitLog相關的一些東西。

CommitLog類、MappedFileQueue、MappedFile

CommitLog名字取得非常好,除了訊息本身,它記錄了訊息的方方面面的資訊,通過一條CommitLog我們可以還原出很多東西。例如訊息是何時、由哪個producer傳送的,被髮送到了哪個訊息佇列,屬於哪個topic,有哪些屬性等等。RokcetMQ儲存的訊息其實儲存的就是這個CommitLog記錄。後面的敘述中,如果沒有特別說明,我們可以將CommitLog記錄等同於訊息,而CommitLog特指儲存訊息的檔案。

CommitLog類如下所示:

public class CommitLog {
    // Message's MAGIC CODE daa320a7
    public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
    // End of file empty MAGIC CODE cbd43194
    private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
   
    private final MappedFileQueue mappedFileQueue; // 存放的檔案佇列

    // 省略程式碼
} 

CommitLog類屬性很多,但是最重要的是mappedFileQueue屬性。之前我們一直說訊息最終儲存在CommitLog裡,實際上CommitLog是一個邏輯上的概念。真正的檔案是一個個MappedFile,然後組成了mappedFileQueue。一個MappedFile最多能存放1G的CommitLog,這個大小在MessageStoreConfi類裡面定義了的:

當一個MappedFile寫滿了之後,就會建立第二MappedFile,然後繼續存CommitLog,如下所示:

MappedFileQueue是存放MappedFile的佇列結構,如下所示:

public class MappedFileQueue {
    private static final int DELETE_FILES_BATCH_MAX = 10;

    private final String storePath; // 檔案儲存路徑,例如/home/gameloft9/store/commitlog

    private final int mappedFileSize; // 單個MappedFile大小,預設1G

    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>(); // mappedFile列表

    private final AllocateMappedFileService allocateMappedFileService; // 分配mappedFile的service
    
    // 省略程式碼
 }   


MappedFileQueue存放了一個個MappedFile,然後還記錄了一些額外的資訊,例如儲存檔案路徑、單個MappedFile大小等等。
下面我們再來看MappedFile的結構:

public class MappedFile extends ReferenceResource {
    protected int fileSize;
    protected FileChannel fileChannel; // 讀寫通道
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    protected ByteBuffer writeBuffer = null; // 緩衝區,使用的是直接堆外記憶體
    
    private String fileName;
    private File file;
    
    private MappedByteBuffer mappedByteBuffer; // 記憶體對映
}


如果之前沒有接觸過FileChannel、ByteBuffer、MappedByteBuffer,可以先補習一下這篇文章:ByteBuffer介紹
writeBuffer使用的是堆外記憶體,mappedByteBuffer是直接將檔案對映到記憶體中,兩者的使用是互斥的。如果啟用了臨時緩衝池(預設不啟用),那麼就會使用writeBuffer寫commitlog,否則就是mappedBtyeBuffer寫commitlog。
CommitLog、MappedFileQueue和MappedFile的大致關係如下所示:

CommitLog記錄

一條CommitLog記錄包括哪些內容呢?CommitLog要實現的功能,決定了它需要儲存哪些內容。首先要實現訊息的儲存,肯定需要把訊息存下來。其次,為了方便建立ConsumerQueue,需要記錄topic、queueId等資訊。為了能跟蹤訊息,需要記錄訊息傳送方地址、傳送時間等。。。
完整的CommitLog記錄如下所示:

為什麼會先跳過訊息儲存流程先講儲存內容結構呢?因為流程實在是太複雜、內容太多了,很容易暈頭轉向。如果先對儲存的內容有一個大致的概念,後面再理解訊息儲存過程會好很多。骨頭難啃,總要挑一處簡單的下嘴嘛。下面從功能上解釋為什麼要存這些欄位。不瞭解整個流程,理解這些欄位也會有些困難,我會盡量保證通俗易懂。

TotalSize

TotalSize很好理解,就是整個CommitLog記錄的大小,包括上面列出來的所有欄位大小。TotalSize佔用4個位元組,在往byteBuffer寫CommitLog的時候,首先就會寫入這個CommitLog大小,如下所示:

 // 1 TOTALSIZE
 this.msgStoreItemMemory.putInt(msgLen);


MagicCode

MagicCode是一個特殊的欄位,它可以標誌ByteBuffer中的某個CommitLog是一個正常的CommitLog,還是因為ByteBuffer沒有多餘的空間存放該CommitLog,導致該CommitLog是一個空的CommitLog。

MagicCode有兩個值,如下所示:

    // Message's MAGIC CODE daa320a7
    public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
    // End of file empty MAGIC CODE cbd43194
    private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;


MESSAGE_MAGIC_CODE表明該CommitLog記錄是一條正常的記錄,BLANK_MAGIC_CODE表明該CommitLog記錄是一個空的CommitLog記錄。

如果儲存CommitLog發現空間不夠,會馬上開闢第二個檔案重新儲存CommitLog記錄,但是之前的空的CommitLog也一樣會儲存下來。在Broker正常退出或者異常退出,重啟之後需要恢復Broker的時候,就會根據這個MagicCode判斷該條CommitLog是否是正常的。

BodyCRC

學過計算機網路的人一定知道CRC,CRC即迴圈冗餘校驗碼,是資料通訊領域中最常用的一種查錯校驗碼,通過CRC就可以知道資料的正確性和完整性。RocketMQ通過CRC來校驗訊息部分,如下所示:

if (checkCRC) {
    int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
    if (crc != bodyCRC) {
       log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
       return new DispatchRequest(-1, false/* success */);
    }
}


queueId

queueId很熟悉的,就是訊息發往哪個佇列。queueId在producer傳送訊息時會選擇出來,這個在文章RocketMQ傳送訊息時如何選擇佇列中已經很詳細的講過了,這裡就不再贅述queueId是如何產生的了。
Topic下會有一堆訊息佇列(ConsumerQueue),RocketMQ在儲存完訊息後,會隨後構建ConsumerQueue,裡面存放著Topic下訊息的在CommitLog檔案中的偏移量,方便根據Topic查詢消費訊息。ConsumerQueue的構建、訊息的消費都是重點內容,會在單獨的文章中進行介紹。

Flag

暫時不知道有什麼用,預設值是0。

QueueOffset

我們之前講過,為了方便Consumer能根據Topic快速的查詢訊息,在CommitLog的基礎上構建了ConsumerQueue,裡面存放了某個Topic下面的所有訊息在CommitLog中的位置。

同樣的,這裡的QueueOffset存放了訊息記錄應該在ConsumerQueue中的位置,這樣構建ConsumerQueue的時候,就知道該條記錄在ConsummerQueue的位置順序,在消費訊息的時候很有用處。QueueOffset一般是是累加1的,如下所示:

case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
     // The next update ConsumeQueue information
     CommitLog.this.topicQueueTable.put(key, ++queueOffset);
      break;


這個與ConsumerQueue的儲存結構有關,後面介紹ConsumerQueue儲存結構的時候會涉及到。

PhysicalOffset

這個很簡單了,就是訊息在CommitLog中的物理位置。需要注意的是,我們CommitLog對應著磁碟上的多個檔案,這裡的偏移量不是從某個檔案開始算的,而是從第一個檔案偏移開始算起的。

SysFlag

SysFlag是RocketMQ內部使用的標記位,通過位運算進行標記。例如是否對訊息進行了壓縮、是否屬於事務訊息。SysFlag初始值為0,可與下面的標記進行位運算。

    public final static int COMPRESSED_FLAG = 0x1;
    public final static int MULTI_TAGS_FLAG = 0x1 << 1; // 2
    public final static int TRANSACTION_NOT_TYPE = 0; // 不參與位運算,用作結果比較,表示無事務
    public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2; // 4
    public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2; // 8
    public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2; // 12


例如對訊息進行了壓縮,那麼SysFlag = 0 | 0x1。又例如flag & TRANSACTION_ROLLBACK_TYPE,可以判斷訊息是否是事務訊息,如果等於0說明不是事務訊息。
BornTimestamp

Producer傳送訊息的時間,如下所示:

 requestHeader.setBornTimestamp(System.currentTimeMillis());

BornHost

Producer傳送訊息使用的套接字地址

 msgInner.setBornHost(ctx.channel().remoteAddress());

CommitLog存的時候是讀取4個位元組的rao ip + 4個位元組的埠號:

 byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
 byteBuffer.putInt(inetSocketAddress.getPort());


StoreTimestamp

訊息在broker上儲存時間。

StoreHostAddress

Broker的套接字地址,儲存方式同BornHost。

ReconsumeTimes

重複消費次數,初始為0。我們消費訊息的時候,如果發生異常,可以選擇晚一點重新消費,如下所示:

consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                try{
                    for(MessageExt msg : msgs){
                        if(msg.getTopic().equals("test")){
                            log.info("收到test型別訊息:" + new String(msg.getBody()));
                        }
                    }

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消費成功
                }catch(Exception e){
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 稍後重新消費
                }
            }
        });


Broker重試的時候,這個ReconsumeTimes就會+1,預設最大重試次數是16次。

PreparedTransactionOffset

事務訊息相關的一個屬性。RocketMQ事務訊息基於兩階段提交,這裡僅僅瞭解一點就夠了,涉及到事務訊息的時候會再提到。

Body

訊息體,沒什麼好說的。需要注意的是,Body前面其實會有4位元組(int)的Body長度,這裡沒有畫出來。

Topic

主題,沒什麼好說的。需要注意的是,Topic前面其實會有1位元組(byte)的Topic長度,這裡沒有畫出來。

Properties

訊息屬性。需要注意的是,Properties前面其實會有2位元組(short)的Properties長度,這裡沒有畫出來。
Properties既存放了RocketMQ內部用到的一些屬性,也存放了使用者的一些屬性。例如傳送訊息的TAG就存放在Properties裡面:

   Message msg = new Message("test",// topic
                            "TagB",// tag就會存放在Properties裡面
                            ("我發了一條訊息").getBytes());// body


Properties中的一些常用key都定義在了MessageConstant裡面,如下所示:

public class MessageConst {
    public static final String PROPERTY_KEYS = "KEYS";
    public static final String PROPERTY_TAGS = "TAGS";
    public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
    public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
    public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
    public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";
    public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";
    public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
    public static final String PROPERTY_PRODUCER_GROUP = "PGROUP";
    public static final String PROPERTY_MIN_OFFSET = "MIN_OFFSET";
    public static final String PROPERTY_MAX_OFFSET = "MAX_OFFSET";
    public static final String PROPERTY_BUYER_ID = "BUYER_ID";
    public static final String PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
    public static final String PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG";
    public static final String PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG";
    public static final String PROPERTY_MQ2_FLAG = "MQ2_FLAG";
    public static final String PROPERTY_RECONSUME_TIME = "RECONSUME_TIME";
    public static final String PROPERTY_MSG_REGION = "MSG_REGION";
    public static final String PROPERTY_TRACE_SWITCH = "TRACE_ON";
    public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
    public static final String PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES";
    public static final String PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME";
}


小結

通過了解CommitLog記錄的一些屬性,可以幫助我們更好的瞭解RocketMQ訊息儲存、消費的一些細節。