1. 程式人生 > 其它 >RocketMQ 訊息儲存和查詢實戰

RocketMQ 訊息儲存和查詢實戰

RocketMQ 作為一款優秀的分散式訊息中介軟體,可以為業務方提供高效能低延遲的穩定可靠的訊息服務。其核心優勢是可靠的消費儲存、訊息傳送的高效能和低延遲、強大的訊息堆積能力和訊息處理能力。 從儲存方式來看,主要有幾個方面:
  • 檔案系統
  • 分散式KV儲存
  • 關係型資料庫
從效率上來講,檔案系統高於KV儲存,KV儲存又高於關係型資料庫。因為直接操作檔案系統肯定是最快的,那麼業界主流的訊息佇列中介軟體,如RocketMQ 、RabbitMQ 、kafka 都是採用檔案系統的方式來儲存訊息。 今天,我們就從它的儲存檔案入手,來探索一下 RocketMQ 訊息儲存的機制。

一、CommitLog

CommitLog,訊息儲存檔案,所有主題的訊息都儲存在 CommitLog 檔案中。 我們的業務系統向 RocketMQ 傳送一條訊息,不管在中間經歷了多麼複雜的流程,最終這條訊息會被持久化到CommitLog檔案。 我們知道,一臺Broker伺服器只有一個CommitLog檔案(組),RocketMQ會將所有主題的訊息儲存在同一個檔案中,這個檔案中就儲存著一條條Message,每條Message都會按照順序寫入。 也許有時候,你會希望看看這個CommitLog檔案中,儲存的內容到底長什麼樣子?

1、訊息傳送

當然,我們需要先往 CommitLog 檔案中寫入一些內容,所以先來看一個訊息傳送的例子。
public static void main(String[] args) throws Exception {
    MQProducer producer = getProducer();
    for (int i = 0;i<10;i++){
        Message message = new Message();
        message.setTopic("topic"+i);
        message.setBody(("清幽之地的部落格").getBytes());
        SendResult sendResult 
= producer.send(message); } producer.shutdown(); }
我們向10個不同的主題中傳送訊息,如果只有一臺Broker機器,它們會儲存到同一個CommitLog檔案中。此時,這個檔案的位置處於 C:/Users/shiqizhen/store/commitlog/00000000000000000000。

2、讀取檔案內容

這個檔案我們不能直接開啟,因為它是一個二進位制檔案,所以我們需要通過程式來讀取它的位元組陣列。
public static ByteBuffer read(String path)throws Exception{
    File file = new File(path);
    FileInputStream fin 
= new FileInputStream(file); byte[] bytes = new byte[(int)file.length()]; fin.read(bytes); ByteBuffer buffer = ByteBuffer.wrap(bytes); return buffer; }

如上程式碼,可以通過傳入檔案的路徑,讀取該檔案所有的內容。為了方便下一步操作,我們把讀取到的位元組陣列轉換為java.nio.ByteBuffer物件。

3、解析

在解析之前,我們需要弄明白兩件事:
  • 訊息的格式,即一條訊息包含哪些欄位;
  • 每個欄位所佔的位元組大小。
在上面的圖中,我們已經看到了訊息的格式,包含了19個欄位。關於位元組大小,有的是 4 位元組,有的是 8 位元組,我們不再一一贅述,直接看程式碼。
/**
 * commitlog 檔案解析
 * @param byteBuffer
 * @return
 * @throws Exception
 */
public static MessageExt decodeCommitLog(ByteBuffer byteBuffer)throws Exception {

    MessageExt msgExt = new MessageExt();

    // 1 TOTALSIZE
    int storeSize = byteBuffer.getInt();
    msgExt.setStoreSize(storeSize);

    if (storeSize<=0){
        return null;
    }

    // 2 MAGICCODE
    byteBuffer.getInt();

    // 3 BODYCRC
    int bodyCRC = byteBuffer.getInt();
    msgExt.setBodyCRC(bodyCRC);

    // 4 QUEUEID
    int queueId = byteBuffer.getInt();
    msgExt.setQueueId(queueId);

    // 5 FLAG
    int flag = byteBuffer.getInt();
    msgExt.setFlag(flag);

    // 6 QUEUEOFFSET
    long queueOffset = byteBuffer.getLong();
    msgExt.setQueueOffset(queueOffset);

    // 7 PHYSICALOFFSET
    long physicOffset = byteBuffer.getLong();
    msgExt.setCommitLogOffset(physicOffset);

    // 8 SYSFLAG
    int sysFlag = byteBuffer.getInt();
    msgExt.setSysFlag(sysFlag);

    // 9 BORNTIMESTAMP
    long bornTimeStamp = byteBuffer.getLong();
    msgExt.setBornTimestamp(bornTimeStamp);

    // 10 BORNHOST
    int bornhostIPLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 : 16;
    byte[] bornHost = new byte[bornhostIPLength];
    byteBuffer.get(bornHost, 0, bornhostIPLength);
    int port = byteBuffer.getInt();
    msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port));

    // 11 STORETIMESTAMP
    long storeTimestamp = byteBuffer.getLong();
    msgExt.setStoreTimestamp(storeTimestamp);

    // 12 STOREHOST
    int storehostIPLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 : 16;
    byte[] storeHost = new byte[storehostIPLength];
    byteBuffer.get(storeHost, 0, storehostIPLength);
    port = byteBuffer.getInt();
    msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port));

    // 13 RECONSUMETIMES
    int reconsumeTimes = byteBuffer.getInt();
    msgExt.setReconsumeTimes(reconsumeTimes);

    // 14 Prepared Transaction Offset
    long preparedTransactionOffset = byteBuffer.getLong();
    msgExt.setPreparedTransactionOffset(preparedTransactionOffset);

    // 15 BODY
    int bodyLen = byteBuffer.getInt();
    if (bodyLen > 0) {
        byte[] body = new byte[bodyLen];
        byteBuffer.get(body);
        msgExt.setBody(body);
    }

    // 16 TOPIC
    byte topicLen = byteBuffer.get();
    byte[] topic = new byte[(int) topicLen];
    byteBuffer.get(topic);
    msgExt.setTopic(new String(topic, CHARSET_UTF8));

    // 17 properties
    short propertiesLength = byteBuffer.getShort();
    if (propertiesLength > 0) {
        byte[] properties = new byte[propertiesLength];
        byteBuffer.get(properties);
        String propertiesString = new String(properties, CHARSET_UTF8);
        Map<String, String> map = string2messageProperties(propertiesString);
    }
    int msgIDLength = storehostIPLength + 4 + 8;
    ByteBuffer byteBufferMsgId = ByteBuffer.allocate(msgIDLength);
    String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());
    msgExt.setMsgId(msgId);

    return msgExt;
}

4、輸出訊息內容

public static void main(String[] args) throws Exception {
    String filePath = "C:\\Users\\shiqizhen\\store\\commitlog\\00000000000000000000";
    ByteBuffer buffer = read(filePath);
    List<MessageExt> messageList = new ArrayList<>();
    while (true){
        MessageExt message = decodeCommitLog(buffer);
        if (message==null){
            break;
        }
        messageList.add(message);
    }
    for (MessageExt ms:messageList) {
        System.out.println("主題:"+ms.getTopic()+" 訊息:"+
            new String(ms.getBody())+"佇列ID:"+ms.getQueueId()+" 儲存地址:"+ms.getStoreHost());
    }
}

執行這段程式碼,我們就可以直接看到CommitLog檔案中的內容:

主題:topic0 訊息:RocketMQ訊息儲存和查詢實戰 佇列ID:1 儲存地址:/192.168.44.1:10911
主題:topic1 訊息:RocketMQ訊息儲存和查詢實戰 佇列ID:0 儲存地址:/192.168.44.1:10911
主題:topic2 訊息:RocketMQ訊息儲存和查詢實戰 佇列ID:1 儲存地址:/192.168.44.1:10911
主題:topic3 訊息:RocketMQ訊息儲存和查詢實戰 佇列ID:0 儲存地址:/192.168.44.1:10911
主題:topic4 訊息:RocketMQ訊息儲存和查詢實戰 佇列ID:3 儲存地址:/192.168.44.1:10911
主題:topic5 訊息:RocketMQ訊息儲存和查詢實戰 佇列ID:1 儲存地址:/192.168.44.1:10911
主題:topic6 訊息:RocketMQ訊息儲存和查詢實戰 佇列ID:2 儲存地址:/192.168.44.1:10911
主題:topic7 訊息:RocketMQ訊息儲存和查詢實戰 佇列ID:3 儲存地址:/192.168.44.1:10911
主題:topic8 訊息:RocketMQ訊息儲存和查詢實戰 佇列ID:2 儲存地址:/192.168.44.1:10911
主題:topic9 訊息:RocketMQ訊息儲存和查詢實戰 佇列ID:0 儲存地址:/192.168.44.1:10911

不用過多的文字描述,通過上面這些程式碼,相信你對CommitLog檔案就有了更進一步的瞭解。

此時,我們再考慮另外一個問題: CommitLog 檔案儲存了所有主題的訊息,但我們消費時,更多的是訂閱某一個主題進行消費。RocketMQ是怎麼樣進行高效的檢索訊息的呢 ?

二、ConsumeQueue

為了解決上面那個問題,RocketMQ引入了ConsumeQueue消費佇列檔案。 在繼續往下說ConsumeQueue之前,我們必須先了解到另外一個概念,即MessageQueue。

1、MessageQueue

我們知道,在傳送訊息的時候,要指定一個Topic。那麼,在建立Topic的時候,有一個很重要的引數MessageQueue。簡單來說,就是你這個Topic對應了多少個佇列,也就是幾個MessageQueue,預設是4個。那它的作用是什麼呢 ? 它是一個數據分片的機制。比如我們的Topic裡面有100條資料,該Topic預設是4個佇列,那麼每個佇列中大約25條資料。 然後,這些MessageQueue是和Broker繫結在一起的,就是說每個MessageQueue都可能處於不同的Broker機器上,這取決於你的佇列數量和Broker叢集。 我們來看上面的圖片,Topic名稱為order的主題,一共有4個MessageQueue,每個裡面都有25條資料。因為在筆者的本地環境只有一個Broker,所以它們的brokerName都是指向同一臺機器。 既然MessageQueue是多個,那麼在訊息傳送的時候,勢必要通過某種方式選擇一個佇列。預設的情況下,就是通過輪詢來獲取一個訊息佇列。
public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}
當然,RocketMQ還有一個故障延遲機制,在選擇訊息佇列的時候會複雜一些,我們今天先不討論。

2、ConsumeQueue

說完了MessageQueue,我們接著來看ConsumerQueue。上面我們說,它是為了高效檢索主題訊息的。 ConsumerQueue也是一組組檔案,它的位置在C:/Users/shiqizhen/store/consumequeue。該目錄下面是以Topic命名的資料夾,然後再下一級是以MessageQueue佇列ID命名的資料夾,最後才是一個或多個檔案。 這樣分層之後,RocketMQ至少可以得到以下幾個訊息:
  • 先通過主題名稱,可以定位到具體的資料夾;
  • 然後根據訊息佇列ID找到具體的檔案;
  • 最後根據檔案內容,找到具體的訊息。
那麼,這個檔案裡面儲存的又是什麼內容呢 ?

3、解析檔案

為了加速ConsumerQueue的檢索速度和節省磁碟空間,檔案中不會儲存訊息的全量訊息。其儲存的格式如下: 同樣的,我們先寫一段程式碼,按照這個格式輸出一下ConsumerQueue檔案的內容。
public static void main(String[] args)throws Exception {
    String path = "C:\\Users\\shiqizhen\\store\\consumequeue\\order\\0\\00000000000000000000";
    ByteBuffer buffer = read(path);
    while (true){
        long offset = buffer.getLong();
        long size = buffer.getInt();
        long code = buffer.getLong();
        if (size==0){
            break;
        }
        System.out.println("訊息長度:"+size+" 訊息偏移量:" +offset);
    }
    System.out.println("--------------------------");
}

在前面,我們已經向order這個主題中寫了100條資料,所以在這裡它的order#messagequeue#0裡面有25條記錄。

訊息長度:173 訊息偏移量:2003
訊息長度:173 訊息偏移量:2695
訊息長度:173 訊息偏移量:3387
訊息長度:173 訊息偏移量:4079
訊息長度:173 訊息偏移量:4771
訊息長度:173 訊息偏移量:5463
訊息長度:173 訊息偏移量:6155
訊息長度:173 訊息偏移量:6847
訊息長度:173 訊息偏移量:7539
訊息長度:173 訊息偏移量:8231
訊息長度:173 訊息偏移量:8923
訊息長度:173 訊息偏移量:9615
訊息長度:173 訊息偏移量:10307
訊息長度:173 訊息偏移量:10999
訊息長度:173 訊息偏移量:11691
訊息長度:173 訊息偏移量:12383
訊息長度:173 訊息偏移量:13075
訊息長度:173 訊息偏移量:13767
訊息長度:173 訊息偏移量:14459
訊息長度:173 訊息偏移量:15151
訊息長度:173 訊息偏移量:15843
訊息長度:173 訊息偏移量:16535
訊息長度:173 訊息偏移量:17227
訊息長度:173 訊息偏移量:17919
訊息長度:173 訊息偏移量:18611
--------------------------

細心的朋友,肯定發現了。上面輸出的結果中,訊息偏移量的差值等於 = 訊息長度 * 佇列長度。

4、查詢訊息

現在我們通過ConsumerQueue已經知道了訊息的長度和偏移量,那麼查詢訊息就比較容易了。
public static MessageExt getMessageByOffset(ByteBuffer commitLog,long offset,int size) throws Exception {
    ByteBuffer slice = commitLog.slice();
    slice.position((int)offset);
    slice.limit((int) (offset+size));
    MessageExt message = CommitLogTest.decodeCommitLog(slice);
    return message;
}

然後,我們可以依靠這種方法,來實現通過ConsumerQueue獲取訊息的具體內容。

public static void main(String[] args) throws Exception {

    //consumerqueue根目錄
    String consumerPath = "C:\\Users\\shiqizhen\\store\\consumequeue";
    //commitlog目錄
    String commitLogPath = "C:\\Users\\shiqizhen\\store\\commitlog\\00000000000000000000";
    //讀取commitlog檔案內容
    ByteBuffer commitLogBuffer = CommitLogTest.read(commitLogPath);
    
    //遍歷consumerqueue目錄下的所有檔案
    File file = new File(consumerPath);
    File[] files = file.listFiles();
    for (File f:files) {
        if (f.isDirectory()){
            File[] listFiles = f.listFiles();
            for (File queuePath:listFiles) {
                String path = queuePath+"/00000000000000000000";
                //讀取consumerqueue檔案內容
                ByteBuffer buffer = CommitLogTest.read(path);
                while (true){
                    //讀取訊息偏移量和訊息長度
                    long offset = (int) buffer.getLong();
                    int size = buffer.getInt();
                    long code = buffer.getLong();
                    if (size==0){
                        break;
                    }
                    //根據偏移量和訊息長度,在commitloh檔案中讀取訊息內容
                    MessageExt message = getMessageByOffset(commitLogBuffer,offset,size);
                    if (message!=null){
                        System.out.println("訊息主題:"+message.getTopic()+" MessageQueue:"+
                            message.getQueueId()+" 訊息體:"+new String(message.getBody()));
                    }
                }
            }
        }
    }
}

執行這段程式碼,就可以得到之前測試樣例中,10個主題的所有訊息。

訊息主題:topic0 MessageQueue:1 訊息體:RocketMQ訊息儲存和查詢實戰
訊息主題:topic1 MessageQueue:0 訊息體:RocketMQ訊息儲存和查詢實戰
訊息主題:topic2 MessageQueue:1 訊息體:RocketMQ訊息儲存和查詢實戰
訊息主題:topic3 MessageQueue:0 訊息體:RocketMQ訊息儲存和查詢實戰
訊息主題:topic4 MessageQueue:3 訊息體:RocketMQ訊息儲存和查詢實戰
訊息主題:topic5 MessageQueue:1 訊息體:RocketMQ訊息儲存和查詢實戰
訊息主題:topic6 MessageQueue:2 訊息體:RocketMQ訊息儲存和查詢實戰
訊息主題:topic7 MessageQueue:3 訊息體:RocketMQ訊息儲存和查詢實戰
訊息主題:topic8 MessageQueue:2 訊息體:RocketMQ訊息儲存和查詢實戰
訊息主題:topic9 MessageQueue:0 訊息體:RocketMQ訊息儲存和查詢實戰

5、消費訊息

訊息消費的時候,其查詢訊息的過程也是差不多的。不過值得注意的一點是,ConsumerQueue檔案和CommitLog檔案可能都是多個的,所以會有一個定位檔案的過程,我們來看原始碼。 首先,根據消費進度來查詢對應的ConsumerQueue,獲取其檔案內容。
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
    //ConsumerQueue檔案大小
    int mappedFileSize = this.mappedFileSize;   
    //根據消費進度,找到在consumerqueue檔案裡的偏移量
    long offset = startIndex * CQ_STORE_UNIT_SIZE;
    if (offset >= this.getMinLogicOffset()) {
        //返回ConsumerQueue對映檔案
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
        if (mappedFile != null) {
            //返回檔案裡的某一塊內容
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
            return result;
        }
    }
    return null;
}
然後拿到訊息在CommitLog檔案中的偏移量和訊息長度,獲取訊息。
public SelectMappedBufferResult getMessage(final long offset, final int size) {
    //commitlog檔案大小
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
    //根據訊息偏移量,定位到具體的commitlog檔案
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
    if (mappedFile != null) {
        //根據訊息偏移量和長度,獲取訊息內容
        int pos = (int) (offset % mappedFileSize);
        return mappedFile.selectMappedBuffer(pos, size);
    }
    return null;
}

三、Index

上面我們看到了通過訊息偏移量來查詢訊息的方式,但RocketMQ還提供了其他幾種方式可以查詢訊息。
  • 通過Message Key 查詢;
  • 通過Unique Key查詢;
  • 通過Message Id查詢。
在這裡,Message Key和Unique Key都是在訊息傳送之前,由客戶端生成的。我們可以自己設定,也可以由客戶端自動生成,Message Id是在Broker端儲存訊息的時候生成。

1、通過 Message Id 查詢

Message Id總共 16 位元組,包含訊息儲存主機地址和在CommitLog檔案中的偏移量offset。有原始碼為證:
/**
 * 建立訊息ID
 * @param input     
 * @param addr      Broker伺服器地址
 * @param offset    正在儲存的訊息,在Commitlog中的偏移量
 * @return
 */
public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
    input.flip();
    int msgIDLength = addr.limit() == 8 ? 16 : 28;
    input.limit(msgIDLength);
    input.put(addr);
    input.putLong(offset);
    return UtilAll.bytes2string(input.array());
}
當我們根據Message Id向Broker查詢訊息時,首先會通過一個decodeMessageId方法,將Broker地址和訊息的偏移量解析出來。
public static MessageId decodeMessageId(final String msgId) throws Exception {
    SocketAddress address;
    long offset;
    int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;
    byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
    byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
    ByteBuffer bb = ByteBuffer.wrap(port);
    int portInt = bb.getInt(0);
    //解析出來Broker地址
    address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);
    //偏移量
    byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
    bb = ByteBuffer.wrap(data);
    offset = bb.getLong(0);
    return new MessageId(address, offset);
}
所以通過Message Id查詢訊息的時候,實際上還是直接從特定Broker上的CommitLog指定位置進行查詢,屬於精確查詢。 這個也沒問題,但是如果通過 Message Key 和 Unique Key 查詢的時候,RocketMQ 又是怎麼做的呢?

2、index索引檔案

ConsumerQueue訊息消費佇列是專門為訊息訂閱構建的索引檔案,提高根據主題與訊息佇列檢索訊息的速度。 另外,RocketMQ引入Hash索引機制,為訊息建立索引,它的鍵就是Message Key 和 Unique Key。 那麼,我們先看看index索引檔案的結構: 為了便於理解,我們還是以程式碼的方式,來解析這個檔案。
public static void main(String[] args) throws Exception {

    //index索引檔案的路徑
    String path = "C:\\Users\\shiqizhen\\store\\index\\20200506224547616";
    ByteBuffer buffer = CommitLogTest.read(path);
    //該索引檔案中包含訊息的最小儲存時間
    long beginTimestamp = buffer.getLong();
    //該索引檔案中包含訊息的最大儲存時間
    long endTimestamp = buffer.getLong();
    //該索引檔案中包含訊息的最大物理偏移量(commitlog檔案偏移量)
    long beginPhyOffset = buffer.getLong();
    //該索引檔案中包含訊息的最大物理偏移量(commitlog檔案偏移量)
    long endPhyOffset = buffer.getLong();
    //hashslot個數
    int hashSlotCount = buffer.getInt();
    //Index條目列表當前已使用的個數
    int indexCount = buffer.getInt();

    //500萬個hash槽,每個槽佔4個位元組,儲存的是index索引
    for (int i=0;i<5000000;i++){
        buffer.getInt();
    }
    //2000萬個index條目
    for (int j=0;j<20000000;j++){
        //訊息key的hashcode
        int hashcode = buffer.getInt();
        //訊息對應的偏移量
        long offset = buffer.getLong();
        //訊息儲存時間和第一條訊息的差值
        int timedif = buffer.getInt();
        //該條目的上一條記錄的index索引
        int pre_no = buffer.getInt();
    }
    System.out.println(buffer.position()==buffer.capacity());
}
我們看最後輸出的結果為true,則證明解析的過程無誤。

3、構建索引

我們傳送的訊息體中,包含Message Key 或 Unique Key,那麼就會給它們每一個都構建索引。 這裡重點有兩個:
  • 根據訊息Key計算Hash槽的位置;
  • 根據Hash槽的數量和Index索引來計算Index條目的起始位置。
將當前 Index條目 的索引值,寫在Hash槽absSlotPos位置上;將Index條目的具體資訊(hashcode/訊息偏移量/時間差值/hash槽的值),從起始偏移量absIndexPos開始,順序按位元組寫入。
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        //計算key的hash
        int keyHash = indexKeyHashMethod(key);
        //計算hash槽的座標
        int slotPos = keyHash % this.hashSlotNum;
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
        //計算時間差值
        long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
        timeDiff = timeDiff / 1000;
        //計算INDEX條目的起始偏移量
        int absIndexPos =
            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                + this.indexHeader.getIndexCount() * indexSize;
        //依次寫入hashcode、訊息偏移量、時間戳、hash槽的值
        this.mappedByteBuffer.putInt(absIndexPos, keyHash);
        this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
        this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
        this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
        //將當前INDEX中包含的條目數量寫入HASH槽
        this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
        return true;
    }
    return false;
}
這樣構建完Index索引之後,根據Message Key 或 Unique Key查詢訊息就簡單了。 比如我們通過RocketMQ客戶端工具,根據Unique Key來查詢訊息。
adminImpl.queryMessageByUniqKey("order", "FD88E3AB24F6980059FDC9C3620464741BCC18B4AAC220FDFE890007");
在Broker端,通過Unique Key來計算Hash槽的位置,從而找到Index索引資料。從Index索引中拿到訊息的物理偏移量,最後根據訊息物理偏移量,直接到CommitLog檔案中去找就可以了。