RocketMQ存儲系統概要設計和源碼解讀
阿新 • • 發佈:2019-03-24
mage 列表 rabbitmq 文件 內存映射 ssa 執行 所有 拷貝
普遍消息存儲技術的選型
- 分布式KV存儲
- NewSQL存儲:TiDB 文件系統:RocketMQ,kafka,RabbitMQ
- RocketMQ:所有的message存儲在一個log裏,不區分topic-queue
- kafka:一個log文件存儲單個topic-queue
RocketMQ實現消息存儲
CommitLog
下圖為CommitLog結構:MappedFile(真實的映射文件)組成MappedQueue構成
存儲消息CommitLog.putMessage()
- 獲取最近一個CommitLog的內存映射文件(零拷貝)
- MappedFileQueue.getLastMappedFile():從其維護的列表中獲取最後一個,因為之前的都已經寫滿了
- MappedFileQueue.load():構建一個MappedFile,加入到列表中
- 如果最近的CommitLog文件寫滿了或者broker剛啟動,mappedfile是空的,創建一個新的
- MappedFileQueue.getLastMappedFile(create:true)
- 計算要創建的CommitLog的起始偏移量(即映射文件的名)
- allocateMappedFileService.putRequestAndReturnMappedFile()創建兩個映射文件,但其實只要創建好第一個就返回了;
- 兩種創建MappedFile方式(內存映射):堆外內存池,直接創建TODO
- 創建好後,對當前映射文件進行預熱MappedFile.warmMappedFile:
- 對當前映射文件的每個內存頁寫入一個字節ByteBuffer,當刷盤策略為同步刷盤時,執行強制刷盤,每修改pages個分頁刷一次盤;
- 因為對每個Mappedfile寫入假字節的時候是通過循環的形式,而寫入次數為MappedFile.size(1024M) / pagesize(4k),這樣占有CPU的時間太久,所以線程會主動休眠,進入就緒狀態,釋放CPU
- MappedFile.mlock()將當前映射文件全部的地址空間鎖定在物理存儲中,防止被交換到swap空間
- 把broker內部的這個message刷新到Mappedfile的內存中MappedFileQueue->MappedFile.put()
- 獲取消息存儲的絕對物理位置
- 同步刷盤GroupCommitService
- 異步刷盤CommitRealTimeService/FlushCommitLogService
- 主從同步
下圖為CommitLog存儲消息的流程:
TODO
ConsumerQueue/Index
是消息存儲的索引文件:內存存儲以topic(目錄)/隊列id(目錄)/MappedFile.....
從CommitLog中拿到message
CommitLog寫入message的時候,異步構建consumerqueue存儲消息索引提供消費者消費
當topic數量增多時,kafka的單個broker的TPS降低了1個數量級,而RocketMQ在海量topic的場景下,依然保持較高的TPS?CommitLog的”隨機讀”對性能的影響
RocketMQ存儲系統概要設計和源碼解讀