解析RocketMQ的訊息索引檔案consumequeue
阿新 • • 發佈:2018-12-13
CommitLog的檔案結構
下圖展示了CommitLog的檔案結構,可以看到,包含了topic、queueId、訊息體等核心資訊。
同Kafka一樣,訊息是變長的,順序寫入。
如下圖所示:
ConsumeQueue的檔案結構
ConsumeQueue中並不需要儲存訊息的內容,而儲存的是訊息在CommitLog中的offset。也就是說,ConsumeQueue其實是CommitLog的一個索引檔案。
如下圖所示:
ConsumeQueue是定長的結構,每1條記錄固定的20個位元組。很顯然,Consumer消費訊息的時候,要讀2次:先讀ConsumeQueue得到offset,再讀CommitLog得到訊息內容。
解析程式碼
import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; public class TestCQ { public static void main(String[] args) throws IOException { decodeCQ(new File("D:\\consumeQueue\\0\\00000000000000000000")); } static void decodeCQ(File consumeQueue) throws IOException { FileInputStream fis = new FileInputStream(consumeQueue); DataInputStream dis = new DataInputStream(fis); long preTag = 0; long count = 1; while (true) { long offset = dis.readLong(); int size = dis.readInt(); long tag = dis.readLong(); if (size == 0) { break; } if ((tag - preTag) != 1) { // System.err.printf("%d: %d %d %d\n", count++, tag, size, // offset); System.out.printf("[ERROR]%d: %d %d %d\n", count++, tag, size, offset); } preTag = tag; System.out.printf("%d: %d %d %d\n", count++, tag, size, offset); } fis.close(); } }