1. 程式人生 > >【RocketMQ原始碼分析】深入訊息儲存(1)

【RocketMQ原始碼分析】深入訊息儲存(1)

![](https://antzyun.oss-cn-beijing.aliyuncs.com/img204d5b68da5e7e26d371c966fbf81d8.jpg) 最近在學習RocketMQ相關的東西,在學習之餘沉澱幾篇筆記。 RocketMQ有很多值得關注的設計點,訊息傳送、訊息消費、路由中心NameServer、訊息過濾、訊息儲存、主從同步、事務訊息等等。 本篇不需要你有使用RocketMQ的前置條件,完全從訊息儲存的直接實現上來分析RocketMQ的Store包。 ## 0.佇列檔案儲存思考 在開始之前,先來做一個簡單思考。 MQ既然要接收大量的訊息,這些訊息如果全部存在記憶體,是否可行呢?
![](https://antzyun.oss-cn-beijing.aliyuncs.com/imgD7594DD974DDD30994BA3161E0239CA5.jpg)
在機器記憶體的限制下當然不行,那麼就要考慮非記憶體的儲存方式。 資料庫? 聽上去很奇葩,但真有這麼實現的,ActiveMQ。 本地磁碟? 這個太慢了,如果我儲存的訊息也支援我順序讀,那還好,如果涉及到各種訊息特性,速度一定會慢很多。 本地磁碟雖然慢,但是它的容量很大。 記憶體容量雖然小,但是它的速度很快。 所以,能不能有一種折中的方法,即用到記憶體又用到本地磁碟。 我想出了一種`填充和交換演算法`,根據需要將固定大小(例如128M)的頁面檔案對映到記憶體中,並在固定的生存時間(TTL)時間內未訪問該檔案時取消對映。 通過這種設計,我不僅可以更安全,更有效地使用記憶體,而且可以在需要時刪除一些用過的頁面檔案以節省磁碟空間。 MQ顧名思義是訊息佇列。 佇列是一種前置讀、後置追加的結構,那麼只需要將佇列的前部分和後部分放入記憶體,中間部分在磁碟操作,就能保證高效、大容量操作。 讀取和追加操作總是可以發生在記憶體中,這意味著入隊和出隊操作總是接近O(1)訪問速度。 但如果想要讀取在磁碟上的資料,速度還是會降下來,為此,我們可以再維護一份索引,記錄目標訊息在磁碟檔案上的偏移量,以隨機讀介面去訪問。 至此,一個很實用的佇列檔案儲存系統就有眉目了。 ## 1.RocketMQ檔案儲存架構 RocketMQ會將訊息儲存到本地檔案,這樣我們的訊息都可以保證可查,也可以在系統故障時恢復。 不同於Kafka那種分割槽的儲存方式,RocketMQ是將訊息都儲存到CommitLog中,不會區分訊息的Topic、Group等資訊。 預設路徑在C盤使用者空間下的store目錄。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307142850.png) 每個CommitLog都有固定的大小。 所有生產者的訊息都會寫道CommitLog檔案中,因為在磁碟上,讀取起來並不是很快,所以還需要一個類似於索引的檔案。 這就是ConsumeQueue,也就是邏輯上的佇列。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307143339.png) ConsumeQueue中存放的CommitLog Offset是訊息在CommitLog中的偏移位置,也就是座標,保證能使用隨機讀來快速定位。 此外還有訊息的大小Size,也就是CommitLog指定位置我需要讀取多少個位元組。 最後是訊息Tag的雜湊值,Message Tag Hashcode,用來過濾訊息。 CommitLog是MQ接收到訊息後直接寫入本地檔案的,而ConsumeQueue這個類似CommitLog索引的結構,是非同步構建的,它是一種邏輯上的佇列結構。 去看本地的儲存檔案,可以發現ConsumeQueue的目錄結構很有意思。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307143830.png) 它是以Topic為主目錄,佇列id為次目錄的層級儲存的。 而ConsumeQueue的儲存單元根據我們上面那副圖的計算,可以得到每個訊息在ConsumeQueue中的大小是20Byte。 >
CommitLog Offset : 8 Byte > Size : 4 Byte > Message Tag Hashcode : 8 Byte ConsumeQueue上還有一些其他的資料,是消費者消費訊息的位置。 綜上,我們可以得到一張RocketMQ的儲存架構圖。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/imgcca07ff148c81614d56831c67ff28b9.jpg) ## 2.原始碼分析 有了CommitLog和ConsumeQueue的概念,我們可以簡單過一邊訊息儲存流程的程式碼了。 RocketMQ的檔案儲存部分程式碼不需要啟動NameServer、Broker等節點就可以單機執行,具體可以參考RocketMQ原始碼Store包下的Test用例。 第一步需要先例項化Store的配置。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307145025.png) 指定CommitLog、ConsumeQueue檔案的大小,以及刷盤機制是同步還是非同步,ConsumeQueue的非同步構建時間間隔。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307145249.png) 之後load、start即可,此處MessageStore例項是RocketMQ訊息儲存系統的核心類,DefaultMessageStore,其putMessage方法就是訊息寫入磁碟檔案的入口方法。 例項化DefaultMessageStore後,就可以構建一個簡單的訊息,使用putMessage方法寫入了。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307145636.png) 使用putMessage。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307145745.png) 此時就進入了`org.apache.rocketmq.store.DefaultMessageStore#putMessage`。 在這裡有六個前置校驗。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307150307.png) 1. 校驗DefaultMessageStore例項初始化狀態,在上一步messageStore進行start時,會完成CommitLog檔案channel的初始化等操作,所以此處需要校驗是否準備完畢。 2. 當前節點是否是從節點,只有主節點才對訊息進行儲存。 3. 是否可寫 4. Topic長度是否過長 5. 配置屬性是否過多 6. 作業系統分頁快取是否忙碌狀態 通過了上述六步校驗,就可以進行檔案儲存了。 下面是DefaultMessageStore#putMessage剩餘的過程,看上去很簡單。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307150829.png) 這裡主要是在呼叫CommitLog的putMessage前後記錄了耗時,並且如果耗時超過500毫秒就發出一次告警日誌,然後是StoreStatusService進行的一系列指標監控,StoreStatusService會記錄下Store一切操作的指標記錄,用於一些RocketMQ控制檯應用去展示節點狀態等資訊。 然後我們進入`org.apache.rocketmq.store.CommitLog#putMessage`。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307151513.png) 在開始,先是在訊息中寫入了時間戳,然後根據訊息內容計算了校驗碼,防止訊息被篡改。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307151651.png) 接著判斷了一下當前訊息是不是事務訊息,如果是事務訊息,則判斷是不是延遲訊息,這裡不做深入。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307151927.png) 接下來就到了RocketMQ訊息儲存的一個核心類了,MappedFile。 MappedFile就是我們最開始思考中的實現隨機讀的關鍵,它將記憶體和檔案建立了對映關係,每一個MappedFile都指向一個CommitLog,我只要知道訊息的位置(ConsumeQueue中儲存的有),就可以快速利用MappedFile去設定偏移位置讀取到該訊息,當我們需要寫入時,MappedFile中也儲存的有CommitLog當前以及寫到哪裡的記錄,只需要在把偏移位置設定到那裡,進行寫入即可。 總的來說,就是結合了順序寫,隨機讀,保證讀寫都是高效的O(1)。 當然我們不止一個CommitLog,那麼也不止一個MappedFile例項,為此,有一個MappedFileQueue來儲存它,下圖是我本地Debug時MappedFileQueue例項的內容。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307152806.png) 因為我本地訊息寫入較少,完全還沒有用到第二個CommitLog,所以Queue中只有一個MappedFile,而這個MappedFile也指向一個CommitLog。 我們在將訊息寫入CommitLog時,需要拿到最靠後的那個CommitLog,因為寫入操作只會在最後面新增訊息。 所以此處我們需要呼叫this.mappedFileQueue.getLastMappedFile()來獲取最後一個MappedFile。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307153111.png) 有了MappedFile,我們就要開始真正對磁碟檔案進行操作,此時需要上鎖,官方給的註釋說此處是自旋鎖or可重入鎖都可以,當然需要看場景,如果你的機器CPU強大,當然推薦自旋鎖了。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307153311.png) 在try-catch中,再次對訊息的儲存時間戳進行了設定(還記得前面已經設定過一次了嗎),然後判斷你拿到的mappedFile是不是null,或者mappedFile是不是已經滿了,如果是這樣,如下圖,就需要重新建立一個CommitLog了。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307153548.png) 如果建立之後還是null,就返回寫入失敗。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307153629.png) 接著,就要呼叫MappedFile的appendMessage方法去向磁碟檔案寫入訊息了。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307153813.png) 這裡返回結果也會去校驗status,根據不同的status去包裝不同的返回結果。 在進入MappedFile#appendMessage之前,讓我們把CommitLog的putMessage方法看完,看看寫入成功後幹了什麼。 震驚!居然是主從同步和刷盤操作!這裡不進去看程式碼了,但是瞭解一下相關概念。 主從同步中該節點會把自己CommitLog的內容同步給其他節點,這裡可以看到,每次有新訊息都會去同步,是實時性的。 刷盤,MappedFile雖然寫入了,但其實是位於Buffer中的,只有刷一次才會到本地磁碟,至於刷盤是同步還是非同步刷,非同步多久刷,就要看你store配置了(記得最開始例項化DefaultMessageStore時候的配置嗎) 扯完這些,就可以進入MappedFile#appendMessage一探究竟了。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307154639.png) 先看看概覽,首先是拿到了CommitLog目前寫入了多少,作為當前寫入位置,然後於CommitLog檔案大小比較,如果大於,那肯定有問題了,會報錯,如果小於,就開始接下來的步驟。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307154840.png) 首先拿到了MappedFile對於CommitLog操作的ByteBuffer,這不就簡單了,寫就完事兒了。 接著設定ByteBuffer要操作的位置。 下面有兩種操作方法,一個是單個訊息寫入,一個是批量,我們來看單個訊息寫入的,doAppend。 doAppend傳入的引數有fileFromOffset,你可以理解它就是CommitLog的名字,因為CommitLog檔名本身就是儲存的偏移,只是分成多個檔案而已,ByteBuffer, 還有一個檔案大小減去當前偏移位置的值,可以理解為當前CommitLog還可以寫入多少,最後一個引數是訊息內容。 ```java cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); ``` doAppend方法位於CommitLog中,這裡是本次訊息寫入原始碼分析的最後一個函數了,在這個方法中,主要是對訊息進行序列化操作,以及buffer的寫入。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307155542.png) 首先,根據buffer的偏移和CommitLog檔案的偏移,計算出這個訊息真正的下表位置,這個值只會出現在ConsumeQueue,因為ConsumeQueue儲存的是相對於所有CommitLog的偏移,而不是在單個CommitLog檔案中的偏移,因此需要加上CommitLog本身的偏移位置。 接著是訊息id,根據MessageDecoder建立即可,這個id會儲存在記憶體,是一個Topic和Topic下的佇列id組成的Table。 具體實現。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307155957.png) 再接著是一大段對訊息配置序列化的操作。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307160039.png) 序列化完成後,我們有訊息的長度,就知道這條訊息在CommitLog中佔有的空間大小了,因此需要判斷一下夠不夠塞。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307160254.png) 這裡if的邏輯可以理解為訊息的大小加上CommitLog的檔案終止符需要小於CommitLog檔案的剩餘空間。 這個檔案終止符其實就是魔數,UNIX系統檔案中隨處可見,作業系統啟動引導中也是,可以理解為用一個特殊的數在磁碟中做空間區分的。 ```java // File at the end of the minimum fixed length empty private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4; ``` 如果大小不夠就返回錯誤。 如果空間足夠,就開始寫入訊息,寫入訊息的格式可以參考下圖CommitLog的Message規範。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/imgb2d2abca687aa407da9ea48a0148df8.jpg) 對比上圖,訊息寫入規範完全符合。 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307161018.png) ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307161203.png) 最後,將訊息放入buffer中,返回成功 ![](https://antzyun.oss-cn-beijing.aliyuncs.com/img20210307161235.png) 綜上,訊息寫入就完成了,不過此時訊息只是寫入了Buffer,還需要等後續刷盤才會持久化到