1. 程式人生 > >RocketMQ存儲系統概要設計和源碼解讀

RocketMQ存儲系統概要設計和源碼解讀

mage 列表 rabbitmq 文件 內存映射 ssa 執行 所有 拷貝

普遍消息存儲技術的選型

  • 分布式KV存儲
  • NewSQL存儲:TiDB
  • 文件系統:RocketMQ,kafka,RabbitMQ
    • RocketMQ:所有的message存儲在一個log裏,不區分topic-queue
    • kafka:一個log文件存儲單個topic-queue

RocketMQ實現消息存儲

CommitLog

下圖為CommitLog結構:MappedFile(真實的映射文件)組成MappedQueue構成

技術分享圖片

存儲消息CommitLog.putMessage()

  • 獲取最近一個CommitLog的內存映射文件(零拷貝)
    • MappedFileQueue.getLastMappedFile():從其維護的列表中獲取最後一個,因為之前的都已經寫滿了
    • MappedFileQueue.load():構建一個MappedFile,加入到列表中
  • 如果最近的CommitLog文件寫滿了或者broker剛啟動,mappedfile是空的,創建一個新的
    • MappedFileQueue.getLastMappedFile(create:true)
    • 計算要創建的CommitLog的起始偏移量(即映射文件的名)
    • allocateMappedFileService.putRequestAndReturnMappedFile()創建兩個映射文件,但其實只要創建好第一個就返回了;
      • 兩種創建MappedFile方式(內存映射):堆外內存池,直接創建TODO
      • 創建好後,對當前映射文件進行預熱MappedFile.warmMappedFile:
      1. 對當前映射文件的每個內存頁寫入一個字節ByteBuffer,當刷盤策略為同步刷盤時,執行強制刷盤,每修改pages個分頁刷一次盤;
      2. 因為對每個Mappedfile寫入假字節的時候是通過循環的形式,而寫入次數為MappedFile.size(1024M) / pagesize(4k),這樣占有CPU的時間太久,所以線程會主動休眠,進入就緒狀態,釋放CPU
      3. MappedFile.mlock()將當前映射文件全部的地址空間鎖定在物理存儲中,防止被交換到swap空間
  • 把broker內部的這個message刷新到Mappedfile的內存中MappedFileQueue->MappedFile.put()
    • 獲取消息存儲的絕對物理位置
  • 同步刷盤GroupCommitService
  • 異步刷盤CommitRealTimeService/FlushCommitLogService
  • 主從同步

下圖為CommitLog存儲消息的流程:

技術分享圖片

TODO

ConsumerQueue/Index

是消息存儲的索引文件:內存存儲以topic(目錄)/隊列id(目錄)/MappedFile.....

從CommitLog中拿到message

CommitLog寫入message的時候,異步構建consumerqueue存儲消息索引提供消費者消費

當topic數量增多時,kafka的單個broker的TPS降低了1個數量級,而RocketMQ在海量topic的場景下,依然保持較高的TPS?CommitLog的”隨機讀”對性能的影響

RocketMQ存儲系統概要設計和源碼解讀