C#分散式訊息佇列 EQueue 2.0 釋出啦
前言
最近花了我幾個月的業餘時間,對EQueue做了一個重大的改造,訊息持久化採用本地寫檔案的方式。到現在為止,總算完成了,所以第一時間寫文章分享給大家這段時間我所積累的一些成果。
昨天,我寫過一篇關於EQueue 2.0效能測試結果的文章,有興趣的可以看看。
為什麼要改為檔案儲存?
SQL Server的問題
之前EQueue的訊息持久化是採用SQL Server的。一開始我覺得沒什麼問題,採用的是非同步定時批量持久化,使用SqlBulkCopy的方法,這個方法測試下來,批量插入訊息的效能還不錯,就決定使用了。一開始我並沒有在使用到EQueue後做整合的效能測試。在功能上確實沒什麼問題了。而且使用DB持久化也有很多好處,比如訊息查詢很簡單,DB天生支援各種方式的查詢。刪除訊息也非常簡單,一條DELETE語句即可。所以功能實現比較順利。但後來當我對EQueue做效能測試時,發現一些問題。當資料庫伺服器和Broker本身部署在不同的伺服器上時,持久化訊息也會走網絡卡,消耗頻寬,影響訊息的傳送和消費的TPS。而如果資料庫伺服器部署在Broker同一臺伺服器上,則因為SQLServer本身也會消耗CPU以及記憶體,也會影響Broker的訊息傳送和消費的TPS。另外SqlBulkCopy的速度,再本身機器正在接收大量的傳送訊息和拉取訊息的請求時,會不太穩定。經過一些測試,發現整個EQueue Broker的效能不太理想。然後又想想,Broker伺服器有有一個硬體一直沒有好好利用起來,那就是硬碟。假設我們的訊息是持久化到本地硬碟的,順序寫檔案,就應該能解決SQL Server的問題了。所以,開始調研如何實現檔案持久化訊息的方案了。
訊息快取在託管記憶體的GC的問題
之前訊息儲存在SQL Server,如果消費者每次讀取訊息時,總是從資料庫去讀取,那對資料庫就是不斷的寫入和讀取,效能不太理想。所以當初的思路是,儘量把最近可能要被消費的訊息快取在本地記憶體中。當初的做法是設計了一個很大的ConcurrentDictionary<long, Message>,這個字典就是存放了所有可能會被消費的訊息。如果要消費的訊息當前不在這個字典裡,就批量從DB拉取一批出來消費。這個設計可以儘可能的避免讀取DB的情況。但是帶來了另一個問題。就是我們對這個字典在高併發不斷的寫入和讀取。且這個字典裡快取的訊息又很多,到到達幾百上千萬時,GC的壓力過大,導致很多執行緒都會被阻塞。嚴重影響Broker的TPS。
所以,基於上面的兩個主要原因,我想到了兩個思路來解決:1)採用寫檔案的方式來持久化訊息;2)使用非託管記憶體來快取將要被消費的訊息;下面我們來看看這兩個設計的一些關鍵問題的設計思路。
檔案儲存的關鍵問題設計
心路背景
之前一直無法駕馭寫檔案的設計。因為精細化的將資料寫入檔案,並能要精確的讀取想要的資料,實在沒什麼經驗。之前雖然也知道阿里的RocketMQ的訊息持久化也是採用順序寫檔案的方式的,但是看了程式碼,發現設計很複雜,一下子也比較難懂。嘗試看了多次也無法完全理解。所以一直無法掌握這種方式。有一天不經意間想到之前看過的EventStore這個開源專案中,也有寫檔案的設計。這個專案是CQRS架構之父greg young所主導的開源專案,是一個專門為ES(Event Sourcing)設計模式中提供儲存事件流支援的事件流儲存系統。於是下定決心專研其原始碼,看C#程式碼肯定還是比Java容易,呵呵。經過一段時間的摸索之後,基本學到了它是如何寫檔案以及如何讀檔案的。瞭解了很多設計思路。然後,在看懂了EventStore的檔案儲存設計之後,再去看RocketMQ的檔案持久化的設計,發現驚人的相似。原來看不懂的程式碼現在也能看懂了,因為思路差不多的。所以,這給我開始動手提供了很大的信心。經過自己的一些準備(檔案讀寫的效能驗證)和設計思路整理後,終於開始動手了。
如何寫訊息到檔案?
其實說出來也很簡單。之前一直以為寫檔案就是一個訊息一行唄。這樣當我們要找哪個訊息時,只需要知道行號即可。確實,理論上這樣也挺好。但上面這兩個開源專案都不是這樣做的,而是都是採用更精細化的直接寫二進位制的方式。搞清楚寫入的格式之後,還要考慮一個檔案寫不下的時候怎麼辦?因為一個檔案總是有大小的,比如1G,那超過1G後,必然要建立新的檔案,然後把訊息寫入新的檔案。所以,我們就又有了Chunk的概念。一個Chunk就是一個檔案,假設我們現在實現了一個FileMessageStore,表示對檔案持久化的封裝,那這個FileMessageStore肯定維護了一堆的Chunk。然後我們也很容易想到一點,就是Chunk有3種狀態:1)New,表示剛建立的Chunk,這種Chunk我們可以寫入新訊息進去;2)Completed,已寫入完成的Chunk,這種Chunk是隻讀的;3)OnGoing的Chunk,就是當FileMessageStore初始化時,要從磁碟的某個chunk的目錄下載入所有的Chunk檔案,那不難理解,最後一個檔案之前的Chunk檔案應該都是Completed的;最後一個Chunk檔案可能寫入了一半,就是之前沒完全用完的。所以,本質上New和Ongoing的Chunk其實是一樣的,只是初始化的方式不同。
至此,我們知道了寫檔案的兩個關鍵思路:1)按二進位制寫;2)拆分為Chunk檔案,且每個Chunk檔案有狀態;按二進位制寫主要的思路是,假如我們當前要寫入的訊息的二進位制陣列大小為100個位元組,也就是說訊息的長度為100,那我們可以先把訊息的長度寫入檔案,再接著寫入訊息本身。這樣我們讀取訊息時,只要知道了寫入訊息長度時的那個Position,就能先讀取到訊息的長度,然後就能知道接下來要讀取多少位元組為訊息內容。從而能正確讀取訊息出來。
另外再分享一點,EventStore中,寫入一個事件到檔案中時,還會在寫入訊息內容後再寫入這個訊息的長度到檔案裡。也就是說,寫入一個數據到檔案時,會在頭尾都寫入該資料的長度。這樣做的好處是什麼呢?就是當我們想從後往前讀資料時,也能方便的做到,因為每個資料的前後都記錄了該資料的長度。這點應該不難理解吧?而EventStore是一個面向流的儲存系統,我們對事件流確實可能從前往後讀,也可能是從後往前讀。另外這個設計還有一個好處,就是起到了校驗資料合法性的目的。當我們根據長度讀取資料後,再資料之後再讀取一個長度,如果這兩個長度一致,那資料應該就沒問題的。在RocketMQ中,是通過CRC校驗的方式來保證讀取的資料沒有問題。我個人還是比較喜歡EventStore的做法。所以EQueue裡現在寫入資料就是這樣做的。
上面我介紹了一種寫入不定長資料到檔案的設計思路,這種設計是為了解決寫入訊息到檔案的情況,因為訊息的長度是不定的。在EQueue中,我們還有一另一種寫檔案的場景。就是佇列資訊的持久化。EQueue的架構是一個Topic下有多個Queue,每個Queue裡有很多訊息,消費者負載均衡是通過給消費者分配均勻數量的Queue的方式來達到的。這樣我們只要確保寫入Queue的訊息是均勻的,那每個Consumer消費到的訊息數就是均勻的。那一個Queue裡記錄的是什麼呢?就是一個訊息和其在佇列的位置的對應關係。假設訊息寫入在檔案的物理位置為10000,然後這個訊息在Queue裡的索引是100,那這個佇列就會把這兩個位置對應起來。這樣當我們要消費這個Queue中索引為100的訊息時,就能找到這個訊息在檔案中的物理位置為10000,就能根據這個位置找到訊息的內容了。如果是託管記憶體,我們只需要弄一個Dictionary,key是訊息在佇列中的Offset,value是訊息在檔案中的物理Offset即可。這樣我們有了這個dict,就能輕鬆建立起對應關係了。但上面我說過,這種巨大的dict是要佔用記憶體的,會有GC的問題。所以更好的辦法是,把這個對應關係也寫入檔案,那怎麼做呢?這時就又需要更精細化的設計了。想到了其實也很簡單,這個設計我是從RocketMQ中學到的。就是我們設計一種固定長度的結構體,這個結構體裡就存放一個數據,就是訊息在檔案的物理位置(為了後面好表達,我命名為MessagePosition),一個Long值,一個Long的長度是8個位元組。也就是說,這個檔案中,每個寫入的資料的長度都是8個位元組。假設我們一個檔案要儲存100W個MessagePosition。那這個檔案的長度就是100W * 8這麼多位元組,大概為7.8MB。那麼這樣做有什麼好處呢?好處就是,假如我們現在要消費這個Queue裡的第一個訊息,那這個訊息的MessagePosition在這個檔案中的位置0,第二個訊息在這個檔案中的位置是8,第三個就是16,以此類推,第N 個訊息就是(N-1) * 8。也就是說,我們無須顯式的把訊息在佇列中的位置資訊也寫入到檔案,而是通過這樣的固定演算法,就能精確的算出Queue中某個訊息的MessagePosition是寫入在檔案的哪個位置。然後拿到了MessagePosition之後,就能從Message的Chunk檔案中讀取到這個訊息了。
通過上面的分析,我們知道了,Producer傳送一個訊息到Broker時,Broker會寫兩次磁碟。一次是現將訊息本身寫入磁碟(Message Chunk裡),另一次是將訊息的寫入位置寫入到磁碟(Queue Chunk裡)。細心的朋友可能會問,假如我第一次寫入成功,但第二次寫入時失敗,比如正好機器斷電或者當前Broker伺服器正好出啥問題 了,沒有寫入成功。那怎麼辦呢?這個沒有什麼大的影響。因為首先這種情況會被認為是訊息傳送失敗。所以Producer還會重新發送該訊息,然後Broker收到訊息後還會再做一次這兩個寫入操作。也就是說,第一次寫入的訊息內容永遠也不會用到了,因為那個寫入位置永遠也不會在Queue Chunk裡有記錄。
下面的程式碼展示了寫訊息到檔案的核心程式碼:
//訊息寫檔案需要加鎖,確保順序寫檔案 MessageStoreResult result = null; lock (_syncObj) { var queueOffset = queue.NextOffset; var messageRecord = _messageStore.StoreMessage(queueId, queueOffset, message); queue.AddMessage(messageRecord.LogPosition, message.Tag); queue.IncrementNextOffset(); result = new MessageStoreResult(messageRecord.MessageId, message.Code, message.Topic, queueId, queueOffset, message.Tag); }
StoreMessage方法內部實現:
public MessageLogRecord StoreMessage(int queueId, long queueOffset, Message message) { var record = new MessageLogRecord( message.Topic, message.Code, message.Body, queueId, queueOffset, message.CreatedTime, DateTime.Now, message.Tag); _chunkWriter.Write(record); return record; }
queue.AddMessage方法的內部實現:
public void AddMessage(long messagePosition, string messageTag) { _chunkWriter.Write(new QueueLogRecord(messagePosition + 1, messageTag.GetHashcode2())); }
ChunkWriter的內部實現:
public long Write(ILogRecord record) { lock (_lockObj) { if (_isClosed) { throw new ChunkWriteException(_currentChunk.ToString(), "Chunk writer is closed."); } //如果當前檔案已經寫完,則需要新建檔案 if (_currentChunk.IsCompleted) { _currentChunk = _chunkManager.AddNewChunk(); } //先嚐試寫檔案 var result = _currentChunk.TryAppend(record); //如果當前檔案已滿 if (!result.Success) { //結束當前檔案 _currentChunk.Complete(); //新建新的檔案 _currentChunk = _chunkManager.AddNewChunk(); //再嘗試寫入新的檔案 result = _currentChunk.TryAppend(record); //如果還是不成功,則報錯 if (!result.Success) { throw new ChunkWriteException(_currentChunk.ToString(), "Write record to chunk failed."); } } //如果需要同步刷盤,則立即同步刷盤 if (_chunkManager.Config.SyncFlush) { _currentChunk.Flush(); } //返回資料寫入位置 return result.Position; } }
當然,我上面為了簡化問題的複雜度。所以沒有引入關於如何根據某個全域性的MessagePosition找到其在哪個Message Chunk的問題。這個其實也很好做,我們首先固定好每個Message Chunk檔案的大小。比如大小為256MB,然後我們為每個Chunk檔案設計一個ChunkHeader,每個Chunk檔案總是先把這個ChunkHeader寫入檔案,這個Header裡記錄了這個檔案的起始位置和結束位置,以及檔案的大小。這樣我們根據某個MessagePosition計算其在哪個Chunk檔案時,只需要把這個MessagePositon對Chunk的大小做取摸操作即可。根據資料的位置找其在哪個Chunk的程式碼看起來如下面這樣這樣:
public Chunk GetChunkFor(long dataPosition) { var chunkNum = (int)(dataPosition / _config.GetChunkDataSize()); return GetChunk(chunkNum); } public Chunk GetChunk(int chunkNum) { if (_chunks.ContainsKey(chunkNum)) { return _chunks[chunkNum]; } return null; }
程式碼很簡單,就不多講了。拿到了Chunk物件後,我們就可以把dataPosition傳給Chunk,然後Chunk內部把這個全域性的dataPosition轉換為本地的一個位置,就能準確的定位到這個資料在當前Chunk檔案的實際位置了。將全域性位置轉換為本地的位置的演算法也很簡單直接:
public int GetLocalDataPosition(long globalDataPosition) { if (globalDataPosition < ChunkDataStartPosition || globalDataPosition > ChunkDataEndPosition) { throw new Exception(string.Format("globalDataPosition {0} is out of chunk data positions [{1}, {2}].", globalDataPosition, ChunkDataStartPosition, ChunkDataEndPosition)); } return (int)(globalDataPosition - ChunkDataStartPosition); }
只需要把這個全域性的位置減去當前Chunk的資料開始位置,就能知道這個全域性位置相對於當前Chunk的本地位置了。
好了,上面介紹了訊息如何寫入的主要思路以及如何讀取資料的思路。
另外一點還想提一下,就是關於刷盤的策略。一般我們寫資料到檔案後,是需要呼叫檔案流的Flush方法的,確保資料最終刷入到了磁碟上。否則資料就還是在緩衝區裡。當然,我們需要注意到,即便呼叫了Flush方法,資料可能也還沒真正邏輯到磁碟,而只是在作業系統內部的緩衝區裡。這個我們就無法控制了,我們能做到的是呼叫了Flush方法即可。那當我們每次寫入一個數據到檔案都要呼叫Flush方法的話,無疑效能是低下的,所以就有了所謂的非同步刷盤的設計。就是我們寫入訊息後不立即呼叫Flush方法,而是採用一個獨立的執行緒,定時呼叫Flush方法來實現刷盤。目前EQueue支援同步刷盤和非同步刷盤,開發者可以自己配置決定採用哪一種。非同步刷盤的間隔預設是100ms。當我們在追求高吞吐量時,應該考慮非同步刷盤,但要求資料可靠性更高但對吞吐量可以低一點時,則可以使用同步刷盤。如果又要高吞吐又要資料高可靠,那就只有一個辦法了,呵呵。就是多增加一些Broker機器,通過叢集來彌補單臺Broker寫入資料的瓶頸。
如何從檔案讀取訊息?
假設我們現在要從一個檔案讀取資料,且是多執行緒併發的讀取,要怎麼設計?一個辦法是,每次讀取時,建立檔案流,然後建立StreamReader,然後讀取檔案,讀取完成後釋放StreamReader並關閉檔案流。但每次要讀取檔案的一個數據都要這樣做的話效能不是太好,因為我們反覆的建立這樣的物件。所以,這裡我們可以使用物件池的概念。就是Chunk內部,預先建立好一些Reader,當需要讀檔案時,獲取一個可用的Reader,讀取完成後,再把Reader歸還到物件池裡。基於這個思路,我設計了一個簡單的物件池:
private readonly ConcurrentQueue<ReaderWorkItem> _readerWorkItemQueue = new ConcurrentQueue<ReaderWorkItem>(); private void InitializeReaderWorkItems() { for (var i = 0; i < _chunkConfig.ChunkReaderCount; i++) { _readerWorkItemQueue.Enqueue(CreateReaderWorkItem()); } _isReadersInitialized = true; } private ReaderWorkItem CreateReaderWorkItem() { var stream = default(Stream); if (_isMemoryChunk) { stream = new UnmanagedMemoryStream((byte*)_cachedData, _cachedLength); } else { stream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, _chunkConfig.ChunkReadBuffer, FileOptions.None); } return new ReaderWorkItem(stream, new BinaryReader(stream)); } private ReaderWorkItem GetReaderWorkItem() { ReaderWorkItem readerWorkItem; while (!_readerWorkItemQueue.TryDequeue(out readerWorkItem)) { Thread.Sleep(1); } return readerWorkItem; } private void ReturnReaderWorkItem(ReaderWorkItem readerWorkItem) { _readerWorkItemQueue.Enqueue(readerWorkItem); }
當一個Chunk初始化時,我們預先初始化好固定數量(可配置)的Reader物件,並把這些物件放入一個ConcurrentQueue裡(物件池的作用),然後要讀取資料時,從從ConcurrentQueue裡拿一個可用的Reader即可,如果當前併發太高拿不到怎麼辦,就等待直到拿到為止,目前我是等待1ms後繼續嘗試拿,直到最後拿到為止。然後ReturnReaderWorkItem就是資料讀取完之後歸還Reader到物件池。就是不是很簡單哦。這樣的設計,可以避免不斷的建立檔案流和Reader物件,可以避免GC的副作用。
Broker重啟時如何做?
大家知道,當Broker重啟時,我們是需要掃描磁碟上Chunk目錄下的所有Chunk檔案的。那怎麼掃描呢?上面其實我也簡單提到過。首先,我們可以對每個Chunk檔案的檔名的命名定義一個規則,第一個Chunk檔案的檔名比如為:message-chunk-000000000,第二個為:message-chunk-000000001,以此類推。那我們掃描時,只要先把所有的檔名獲取到,然後對檔名升序排序。那最後一個檔案之前的檔案肯定都是寫入完全了的,即上面我說的Completed狀態的,而最後一個檔案是還沒有寫入完的,還可以接著寫。所以我們初始化時,只需要先初始化最後一個之前的所有Chunk檔案,最後再初始化最後一個檔案即可。這裡我所說的初始化不是要把整個Chunk檔案的內容都載入到記憶體,而是隻是讀取這個檔案的ChunkHeader的資訊維護在記憶體即可。有了Header資訊,我們就可以為後續的資料讀取提供位置計算了。所以,整個載入過程是很快的,讀取100個Chunk檔案的ChunkHeader也不過一兩秒的時間,完全不影響Broker的啟動時間。對於初始化Completed的Chunk比較簡單,只需要讀取ChunkHeader資訊即可。但是初始化最後一個檔案比較麻煩,因為我們還要知道這個檔案當前寫入到哪裡了?從而我們可以從這個位置的下一個位置接著往下寫。那怎麼知道這個檔案當前寫入到哪裡了呢?其實比較複雜。有很多技術,我看到RocketMQ和EventStore這兩個開源專案中都採用了Checkpoint的技術。就是當我們每次寫入一個數據到檔案後,都會更新一下Checkpoint,即表示當前寫入到這個檔案的哪裡了。然後這個Checkpoint值我們也是定時非同步儲存到某個獨立的小檔案裡,這個檔案裡只儲存了這個Checkpoint。這樣的設計有一個問題,就是假如資料寫入了,但由於Checkpoint的儲存不是實時的,所以理論上會出現Checkpint值會小於實際檔案寫入的位置的情況。一般我們忽略這種情況即可,即可能會存在初始化時,下次寫入可能會覆蓋一定的之前已經寫入的資料,因為Checkpoint可能是稍微老一點的。
而我在設計時,希望能再嚴謹一點,取消Checkpoint的設計,而是採用在初始化Ongoing狀態的Chunk檔案時,從檔案的頭開始不斷往下讀,當最後無法往下讀時,我們就知道這個檔案我們當前寫入到哪裡了。那怎麼知道無法往下讀了呢?也就是說怎麼確定後續的檔案內容不是我們寫入的?也很簡單。對於不固定資料長度的Chunk來說,由於我們每次寫入一個數據時都是同時在前後寫入這個資料的長度;所以我們再初始化讀取這個檔案時,可以藉助這一點來校驗,但出現不符合這個規則的資料時,就認為後續不是正常的資料了。對於固定長度的Chunk來說,我們只要保證每次寫入的資料的資料是非0了。而對於EQueue的場景,固定資料的Chunk裡儲存的都是訊息在Message Chunk中的全域性位置,一個Long值;但這個Long值我們正常是從0開始的,怎麼辦呢?很簡單,我們寫入MessagePosition時,總是加1即可。即假如當前的MessagePosition為0,那我們實際寫入1,如果為100,則實際寫入的值是101。這樣我們就能確保這個固定長度的Chunk檔案裡每個資料都是非0的。然後我們在初始化這樣的Chunk檔案時,只要不斷讀取固定長度(8個位元組)的資料,當出現讀取到的資料為0時,就認為已經到頭了,即後續的不是我們寫入的資料了。然後我們就能知道接下來要從哪裡開始讀取了哦。
如何儘量避免讀檔案?
上面我介紹瞭如何讀檔案的思路。我們也知道了,我們是在消費者要消費訊息時,從檔案讀取訊息的。但對從檔案讀取訊息總是沒有比從記憶體讀取訊息來的快。我們前面的設計都沒有把記憶體好好利用起來。所以我們能否考慮把未來可能要消費的Chunk檔案的內容直接快取在記憶體呢?這樣我們就可以避免對檔案的讀取了。肯定可以的。那怎麼做呢?前面我提高多,曾經我們用託管記憶體中的ConcurrentDictionary<long, Message>這樣的字典來快取訊息。我也提到這會帶來垃圾回收而影響效能的問題。所以我們不能直接這樣簡單的設計。經過我的一些嘗試,以及從EventStore中的原始碼中學到的,我們可以使用非託管記憶體來快取Chunk檔案。我們可以使用Marshal.AllocHGlobal來申請一塊完整的非託管記憶體,然後再需要釋放時,通過Marshal.FreeHGlobal來釋放。然後,我們可以通過UnmanagedMemoryStream來訪問這個非託管記憶體。這個是核心思路。那麼怎樣把一個Chunk檔案快取到非託管記憶體呢?很簡單了,就是掃描這個檔案的所有內容,把內容都寫入記憶體即可。程式碼如下:
private void LoadFileChunkToMemory() { using (var fileStream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 8192, FileOptions.None)) { var cachedLength = (int)fileStream.Length; var cachedData = Marshal.AllocHGlobal(cachedLength); try { using (var unmanagedStream = new UnmanagedMemoryStream((byte*)cachedData, cachedLength, cachedLength, FileAccess.ReadWrite)) { fileStream.Seek(0, SeekOrigin.Begin); var buffer = new byte[65536]; int toRead = cachedLength; while (toRead > 0) { int read = fileStream.Read(buffer, 0, Math.Min(toRead, buffer.Length)); if (read == 0) { break; } toRead -= read; unmanagedStream.Write(buffer, 0, read); } } } catch { Marshal.FreeHGlobal(cachedData); throw; } _cachedData = cachedData; _cachedLength = cachedLength; } }
程式碼很簡單,不用多解釋了。需要注意的是,上面這個方法針對的是Completed狀態的Chunk,即已經寫入完成的Chunk的。已經寫入完全的Chunk是隻讀的,不會再發生更改,所以我們可以隨便快取在記憶體中。
那對於新創建出來的Chunk檔案呢?正常情況下,消費者來得及消費時,我們總是在不斷的寫入最新的Chunk檔案,也在不斷的從這個最新的Chunk檔案讀取訊息。那我們怎麼確保消費最新的訊息時,也不需要從檔案讀取呢?也很簡單,就是在新建一個Chunk檔案時,如果記憶體足夠,也同時建立一個一樣大小的基於非託管記憶體的Chunk。然後我們再寫入訊息到檔案Chunk成功後,再同時寫入這個訊息到非託管記憶體的Chunk。這樣,我們在消費訊息,讀取訊息時總是首先判斷當前Chunk是否關聯了一個非託管記憶體的Chunk,如果有,就優先從記憶體讀取即可。如果沒有才從檔案Chunk讀取。
但是從檔案讀取時,可能會遇到一個問題。就是我們剛寫入到檔案的資料可能無法立即讀取到。因為寫入的資料沒有立即刷盤,所以無法通過Reader讀取到。所以,我們不能僅通過判斷當前寫入的位置來判斷當前是否還有資料可以被讀取,而是考慮當前的最後一次刷盤的位置。理論上只能讀取刷盤之前的資料。但即便這樣設計了,在如果當前硬碟不是SSD的情況下,好像也會出現讀不到資料的問題。偶爾會報錯,有朋友在測試時已經遇到了這樣的問題。那怎麼辦呢?我想了一個辦法。因為這種情況歸根接地還是因為我們邏輯上認為已經寫入到檔案的資料由於未及時刷盤或者作業系統本身的內部快取的問題,導致資料未能及時寫入磁碟。出現這種情況一定是最近的一些資料。所以我們如果能夠把比如最近寫入的10000(可配置)個數據都快取在本地託管記憶體中,然後讀取時先看本地快取的託管記憶體中有沒有這個位置的資料,如果有,就不需要讀檔案了。這樣就能很好的解決這個問題了。那怎麼確保我們只快取了最新的10000個數據且不會超出10000個呢?答案是環形佇列。這個名字聽起來很高大上,其實就是一個數組,陣列的長度為10000,然後我們在寫入資料時,我們肯定知道這個資料在檔案中的位置的,我們可以把這個位置(一個long值)對10000取摸,就能知道該把這個資料快取在這個陣列的哪個位置了。通過這個設計確保快取的資料不會超過10000個,且確保一定只快取最新的資料,如果新的資料儲存到陣列的某個下標時,該下標已經存在以前已經儲存過的資料了,就自動覆蓋掉即可。由於這個陣列的長度不是很長,所以每什麼GC的問題。
但是光這樣還不夠,我們這個陣列中的每個元素至少要記錄這個元素對應的資料在檔案中的位置。這個是為了我們在從陣列中獲取到資料後,進一步校驗這個資料是否是我想要的那個位置的資料。這點大家應該可以理解的吧。下面這段程式碼展示瞭如何從環形陣列中讀取想要的資料:
if (_cacheItems != null) { var index = dataPosition % _chunkConfig.ChunkLocalCacheSize; var cacheItem = _cacheItems[index]; if (cacheItem != null && cacheItem.RecordPosition == dataPosition) { var record = readRecordFunc(cacheItem.RecordBuffer); if (record == null) { throw new ChunkReadException( string.Format("Cannot read a record from data position {0}. Something is seriously wrong in chunk {1}.", dataPosition, this)); } if (_chunkConfig.EnableChunkReadStatistic) { _chunkStatisticService.AddCachedReadCount(ChunkHeader.ChunkNumber); } return record; } }
_cacheItems是當前Chunk內的一個環形陣列,然後假如當前我們要讀取的資料的位置是dataPosition,那我們只需要先對環形資料的長度取摸,得到一個下標,即上面程式碼中的index。然後就能從陣列中拿到對應的資料了,然後如果這個資料存在,就進一步判斷這個資料dataPosition是否和要求的dataPosition,如果一致,我們就能確定這個資料確實是我們想要的資料了。就可以返回了。
所以,通過上面的兩種快取(非託管記憶體+託管記憶體環形陣列)的設計,我們可以確保幾乎不用再從檔案讀取訊息了。那什麼時候還是會從檔案讀取呢?就是在1)記憶體不夠用了;2)當前要讀取的資料不是最近的10000個;這兩個前提下,才會從檔案讀取。一般我們線上伺服器,肯定會保證記憶體是可用的。EQueue現在有兩個記憶體使用的水位。一個是當實體記憶體使用到多少百分比(預設值為40%)時,開始清理已經不再活躍的Chunk檔案的非託管記憶體Chunk;那什麼是不活躍呢?就是在最近5s內沒有發生過讀寫的Chunk。這個設計我覺得是非常有效的,因為假如一個Chunk有5s沒有發生過讀寫,那一般肯定是沒有消費者在消費它了。另一個水位是指,最多EQueue Broker最多使用實體記憶體的多少百分比(預設值為75%),這個應該好理解。這個水位是為了保證EQueue不會把所有實體記憶體都吃光,是為了確保伺服器不會因為記憶體耗盡而宕機或導致服務不可用。
那什麼時候會出現大量使用伺服器記憶體的情況呢?我們可以推匯出來的。正常情況下,訊息寫入第一個Chunk,我們就在讀取第一個Chunk;寫入第二個Chunk我們也會跟著讀取第二個Chunk;假設當前寫入到了第10個Chunk,那理論上前面的9個Chunk之前快取的非託管記憶體都可以釋放了。因為肯定超過5s沒有發生讀寫了。但是假如現在消費者有很多,且每個消費者的消費進度都不同,有些很快,有些很慢,當所有的消費者的消費進度正好覆蓋到所有的Chunk檔案時,就意味著每個Chunk檔案都在發生讀取。也就是說,每個Chunk都是活躍的。那此時就無法釋放任何一個Chunk的非託管記憶體了。這樣就會導致佔用大量非託管記憶體了。但由於75%的水位的設計,Broker記憶體的使用是不會超過實體記憶體75%的。在建立新的Chunk或者嘗試快取一個Completed的Chunk時,總是會判斷當前使用的實體記憶體是否已經超過75%,如果已經超過,就不會分配對應的非託管記憶體了。
如何刪除訊息?
刪除訊息的設計比較簡單。主要的思路是,當我們的訊息已經被所有的消費者都消費過了,且滿足我們的刪除策略了,就可以刪除了。RocketMQ刪除訊息的策略比較粗暴,沒有考慮訊息是否經被消費,而是直接到了一定的時間就刪除了,比如最多隻保留2天。這個是RocketMQ的設計。EQueue中,會確保訊息一定是被所有的消費者都消費了才會考慮刪除。然後目前我設計的刪除策略有兩種:
- 按Chunk檔案數;即設計一個閥值,表示磁碟上最多儲存多少個Chunk檔案。目前預設值為100,每個Chunk檔案的大小為256MB。也就是大概總磁碟佔用25G。一般我們的硬碟肯定有25G的。當我們不關心訊息儲存多久而只從檔案數的角度來決定訊息是否要刪除時,可以使用這個策略;
- 按時間來刪除,預設是7天,即當某個Chunk是7天前建立的,那我們就可以建立了。這種策略是不關心Chunk總共有多少,完全根據時間的維度來判斷。
實際上,應該可能還有一些需求希望能把兩個策略合起來考慮的。這個目前我沒有做,我覺得這兩種應該夠了。如果大家想做,可以自己擴充套件的。
另外,上面我說過EQueue中目前有兩種Chunk檔案,一種是儲存訊息本身的,我叫做Message Chunk;一種是儲存佇列資訊的,我叫做Queue Chunk;而Queue Chunk的資料是依賴於Message Chunk的。上面我說的兩種刪除策略是針對Message Chunk而言的。而Queue Chunk,由於這個依賴性,我覺得比較合理的方式是,只需要判斷當前Queue Chunk中的所有的訊息對應的Message Chunk是否已經都刪除了,如果是,難說明這個Queue Chunk也已經沒意義了,就可以刪除了。但只要這個Queue Chunk中至少還有一個訊息的Chunk檔案沒刪除,那這個Queue Chunk就不會刪除。
上面這個只是思路哦,真實的程式碼肯定比這個複雜,呵呵。有興趣的朋友還是要看程式碼的。
如何查訊息?
之前用SQL Server的方式,由於DB很容易查訊息,所以查詢訊息不是大問題。但是現在我們的訊息是放在檔案裡的,那要怎麼查詢呢?目前支援根據訊息ID來查詢。當Producer傳送一個訊息到Broker,Broker返回結果裡會包含訊息的ID。Producer的正確做法應該是要用日誌或其他方式記錄這個ID,並最好和自己的當前業務訊息的某個業務ID一起記錄,比如CommandId或者EventId,這樣我們就能根據我們的業務ID找到訊息ID,然後根據訊息ID找到訊息內容了。那訊息ID現在是怎麼設計的呢?也是受到RocketMQ的啟發,訊息ID由兩部分組成:1)Broker的IP;2)訊息在Broker的檔案中的全域性位置;這樣,當我們要根據某個訊息ID查詢時,就可以先定位到這個訊息在哪個Broker上,也同時知道了訊息在檔案的哪個位置了,這樣就能最終讀取這個訊息的內容了。
為什麼要這樣設計呢?如果我們的Broker沒有叢集,那其實不需要包含Broker的IP;這個設計是為了未來EQueue Broker會支援叢集的,那個時候,我們就必須要知道某個訊息ID對應的Broker是哪個了。
如何儲存佇列消費進度?
EQueue中,每個Queue,都會有一個對應的Consumer。消費進度就是這個Queue當前被消費到哪裡了,一個Offset值。比如Offset為100,就表示當前這個Queue已經消費到第99(因為是從0開始的)個位置的訊息了。因為一個Broker上有很多的Queue,比如有100個。而我們現在是使用檔案的方式來儲存資訊了。所以自然消費進度也是用檔案了。但由於消費進度的資訊很少,也不是遞增的形式。所以我們可以簡單設計,目前EQueue採用一個檔案的方式來儲存所有Queue的消費進度,檔案內容為JSON,JSON裡記錄了每個Queue的消費進度。檔案內容看起來像下面這樣:
{"SampleGroup":{"topic1-3":89055,"topic1-2":89599,"topic1-1":89471,"topic1-0":89695}}
上面的JSON標識一個名為SampleGroup的ConsumerGroup,他消費了一個名為topic1的topic,然後這個topic下的每個Queue的消費進度記錄了下來。如果有另一個ConsumerGroup,也消費了這個topic,那消費進度是隔離的。如果還不清楚ConsumerGroup的同學,要去看一下我之前寫的EQueue的文章了。
還有沒有可以優化的地方?
到目前為止,還有沒有其他可優化的大的地方呢?有。之前我做EQueue時,總是把訊息從資料庫讀取出來,然後構造出訊息物件,再把訊息物件序列化為二進位制,再返回給Consumer。這裡涉及到從DB拿出來,再序列化為二進位制。學習了RocketMQ的程式碼後,我們可以做的更聰明一點。因為其實基於檔案儲存時,我們從檔案裡拿出來的已經是二進位制了。所以可以直接把二進位制返回給消費者即可。不需要先轉換為物件再做序列化了。通過這個設計的改進,我們現在的消費者消費訊息,可以說無任何瓶頸了,非常快。
如何統計訊息讀寫情況?
在測試寫檔案的這個版本時,我們很希望知道每個Chunk的讀寫情況的統計,從而確定設計是正確的。所以,我給EQueue的Chunk增加了實時統計Chunk讀寫的統計服務。目前我們在執行EQueue自帶的例子時,Broker會每個一秒打印出所有Chunk的讀寫情況,這個特性極大的方便我們判斷訊息的傳送和消費是否正常,消費是否有延遲等。
其他新增功能
更完善和安全的佇列擴容和縮容設計
這次我給EQueue的Web後臺管理控制檯也完善了一下佇列的增加和減少的設計。增加佇列(即佇列的擴容)比較簡單,直接新增即可。但是當我們要刪除一個佇列時,怎樣安全的刪除呢?主要是要確保刪除這個佇列時,已經沒有Producer或Consumer在使用這個隊列了。要怎麼做到呢?我的思路是,為每個Queue物件設計兩個屬性,表示對Producer是否可見,對Consumer是否可見。當我們要刪除某個Queue時,可以:1)先讓其對Producer不可見,這樣Producer後續就不會再發送新的訊息到這個隊列了;然後等待,直到這個佇列裡的訊息都被所有的消費者消費掉了;然後再設定為對Consumer不可見。然後再過幾秒,確保每個消費者都不會再向這個佇列發出拉取訊息的請求了。這樣我們就能安全的刪除這個隊列了。刪除佇列的邏輯大概如如下:
public void DeleteQueue(string topic, int queueId) { lock (this) { var key = QueueKeyUtil.CreateQueueKey(topic, queueId); Queue queue; if (!_queueDict.TryGetValue(key, out queue)) { return; } //檢查佇列對Producer或Consumer是否可見,如果可見是不允許刪除的 if (queue.Setting.ProducerVisible || queue.Setting.ConsumerVisible) { throw new Exception("Queue is visible to producer or consumer, cannot be delete."); } //檢查是否有未消費完的訊息 var minConsumedOffset = _consumeOffsetStore.GetMinConsumedOffset(topic, queueId); var queueCurrentOffset = queue.NextOffset - 1; if (minConsumedOffset < queueCurrentOffset) { throw new Exception(string.Format("Queue is not allowed to delete, there are not consumed messages: {0}", queueCurrentOffset - minConsumedOffset)); } //刪除佇列的消費進度資訊 _consumeOffsetStore.DeleteConsumeOffset(queue.Key); //刪除佇列本身,包括所有的檔案 queue.Delete(); //最後將佇列從字典中移除 _queueDict.Remove(key); } }
程式碼應該很簡單直接,不多解釋了。佇列的動態新增和刪除,可以方便我們線上應付線上活動時,隨時為消費者提供更高的並行消費能力,以及活動結束後去掉多餘的佇列。是非常實用的功能。
支援Tag功能
這個功能,也是非常實用的。這個版本我加了上去。以前EQueue只有Topic的概念,沒有Tag的概念。Tag是對Topic的二級過濾。比如當某個Producer傳送了3個訊息,Topic都是topic,然後tag分別是01,02,03。然後Consumer訂閱了這個Topic,但是訂閱這個Topic時同時制定了Tag,比如指定為02,那這個Consumer就只會收到一個訊息。Tag為01,03的訊息是不會收到的。這個就是Tag的功能。我覺得Tag對我們是非常有用的,它可以極大的減少我們定義Topic。本來我們必須要定義一個新的Topic時,現在可能只需要定義一個Tag即可。關於Tag的實現,我就不展開了。
支援訊息堆積報警
終於到最後一點了,終於堅持快寫完了,呵呵。EQueue Web後臺管理控制檯現在支援訊息堆積的報警了。當EQueue Broker上當前所有未消費的訊息數達到一定的閥值時,就會發送郵件進行報警。我們可以把我們的郵件和我們的手機簡訊進行繫結,比如移動的139郵箱我記得就有這個功能。這樣我們就能第一時間知道Broker上是否有大量訊息堆積了,可以讓我們第一時間處理問題。
結束語
這篇文章感覺是我有史以來寫過的最有乾貨的一篇了,呵呵。一氣呵成,也是對我前面幾個月的所有積累知識經驗的一次性釋放吧。希望能給大家一些幫助。我寫文章比較喜歡寫思路,不太喜歡介紹如何用。我覺得一個程式設計師,最重要的是要學會如何思考去解決自己想解決的問題。而不是別人直接告訴你如何去解決。通過做EQueue這個分散式訊息佇列,也算是我自己的一個實踐過程。我非常鼓勵大家寫開源專案哦,當你專注於實現某個你感興趣的開源專案時,你就會有目標性的去學習相關的知識,你的學習就不會迷茫,不會為了學技術而學技術了。我在做EQuque時,要考慮各種東西,比如通訊層的設計、訊息持久化、整個架構設計,等等。我覺得是非常鍛鍊人的。
一個人時間短暫,如果能用有限的時間做出好的東西可以造福後人,那我們來到這個世上也算沒白來了,你說對嗎?所以,我們千萬不要放棄我們的理想,雖然堅持理想很難,但也要堅持。
相關推薦
C#分散式訊息佇列 EQueue 2.0 釋出啦
前言 最近花了我幾個月的業餘時間,對EQueue做了一個重大的改造,訊息持久化採用本地寫檔案的方式。到現在為止,總算完成了,所以第一時間寫文章分享給大家這段時間我所積累的一些成果。 昨天,我寫過一篇關於EQueue 2.0效能測試結果的文章,有興趣的可以看看。 為什麼要改為檔案儲存? SQL
一個純C#寫的分散式訊息佇列介紹2
一年前,當我第一次開發完EQueue後,寫過一篇文章介紹了其整體架構,做這個框架的背景,以及架構中的所有基本概念。通過那篇文章,大家可以對EQueue有一個基本的瞭解。經過了1年多的完善,EQueue無論是功能上還是成熟性上都完善了不少。所以,希望再寫一篇文章,介紹一下EQueue的整體架構和關鍵特性。 E
BlockLang 0.2.0 釋出啦
BlockLang 官網:https://blocklang.com 歡迎您瞭解 Block Lang 0.2.0 釋出的功能
XXL-MQ v1.2.2 釋出,分散式訊息佇列
Release Notes 1、訪問令牌(accessToken):為提升系統安全性,訊息中心和客戶端進行安全性校驗,雙方AccessToken匹配才允許通訊; 2、支援批量註冊、摘除,提升註冊發現效能;升級 xxl-rpc 至 v1.3.1; 3、升級 pom
XXL-MQ v1.2.0,分散式訊息佇列
Release Notes 1、client端與Broker長鏈初始化優化,防止重複建立連線。 2、POM多項依賴升級; 3、UI元件升級; 4、規範專案目錄結構; 6、超時控制; 5、通訊遷移至 xxl-rpc; 6、除了springboot型別示例;新增無框架
doctest 2.2.0 釋出,快速靈活的 C++ 測試框架
doctest 2.2.0 已釋出,更新內容: remove the FAST_ versions of the binary asserts (not a breaking change!) #167 [compile times] make the D
分散式訊息規範 OpenMessaging 1.0.0-preview 釋出
OpenMessaging 是由阿里巴巴牽頭髮起,由 Yahoo、滴滴、Streamlio、微眾銀行、Datapipeline 等公司共同發起建立的分散式訊息規範,其目標在於打造廠商中立,面向 Cloud Native ,同時對流計算以及大資料生態友好的下一代分散式
C#的分散式訊息佇列介紹
EQueue架構 EQueue是一個分散式的、輕量級、高效能、具有一定可靠性,純C#編寫的訊息佇列,支援消費者叢集消費模式。 主要包括三個部分:producer, broker, consumer。producer就是訊息傳送者;broker就是訊息佇列伺服器,負責接收producer傳送過來的訊息
c#開源訊息佇列中介軟體EQueue 教程
一、簡介 EQueue是一個參照RocketMQ實現的開源訊息佇列中介軟體,相容Mono,具體可以參看作者的文章《分享一個c#寫的開源分散式訊息佇列equeue》。專案開源地址:https://github.com/tangxuehua/equeue,專案中包含了佇列
基於Python的分散式高可用擴充套件引擎Ray 0.2.0釋出
Ray是為python機器學習、深度學習而開發的高可用、高效能的分散式框架,目前已經發布到0.2.0版本(注:2017-11-1已經發布了0.2.2),下面是版本釋出說明 我們很高興釋出Ray 0.2版本釋出,本次釋出包括以下資訊: * Plasma 物
分散式訊息佇列RocketMQ原始碼分析之2 -- Broker與NameServer心跳機制
我們知道,Kafka是通過ZK的臨時節點來監測Broker的死亡的。當一個Broker掛了之後,ZK上面對應的臨時節點被刪除,同時其他Broker收到通知。 那麼在RocketMQ中,對應的NameServer是如何判斷一個Broker的死亡呢? 有興趣朋友
asp.net core 2.0釋出到IIS流程及報錯解決方案
我這是個新裝的伺服器,沒有安裝任何軟體。 一、釋出流程 1.安裝AspNetCoreModule託管模組,同時會自動安裝..net core runtime DotNetCore.2.0.8-WindowsHosting.exe https://docs.microsoft.
大型網站架構系列:分散式訊息佇列(一)(轉)
以下是訊息佇列以下的大綱,本文主要介紹訊息佇列概述,訊息佇列應用場景和訊息中介軟體示例(電商,日誌系統)。 本次分享大綱 訊息佇列概述 訊息佇列應用場景 訊息中介軟體示例 JMS訊息服務(見第二篇:大型網站架構系列:分散式訊息佇列(二)) 常用訊息佇列(見第二篇:大型網站架構系列:分
Kafka分散式訊息佇列
基本架構 Kafka分散式訊息佇列的作用: 解耦:將訊息生產階段和處理階段拆分開,兩個階段互相獨立各自實現自己的處理邏輯,通過Kafka提供的訊息寫入和消費介面實現對訊息的連線處理。降低開發複雜度,提高系統穩定性。 高吞吐率:kafka通過順序讀寫磁碟提供可以和記憶體隨機讀寫相匹敵的讀寫速度,靈活的客戶
基於Docker搭建分散式訊息佇列Kafka
本文基於Docker搭建一套單節點的Kafka訊息佇列,Kafka依賴Zookeeper為其管理叢集資訊,雖然本例不涉及叢集,但是該有的元件都還是會有,典型的kafka分散式架構如下圖所示。本例搭建的示例包含Zookeeper + Kafka + Kafka-manger mark &
效能躍升50%!解密自主研發的金融級分散式關係資料庫OceanBase 2.0
小螞蟻說: 相信大家對螞蟻金服自主研發的金融
RabbitMQ系列之七 分散式訊息佇列應用場景之非同步處理、應用解耦、流量削鋒和訊息通訊理解分析
摘要:訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題。實現高效能,高可用,可伸縮和最終一致性架構。是大型分散式系統不可缺少的中介軟體。 目前在生產環境,使用較多的訊息佇列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,
IPython 7.2.0 釋出,Python 命令列互動
IPython 是 Python 的原生互動式 shell 的增強版,可以完成許多不同尋常的任務,比如幫助實現並行化計算;主要使用它提供的互動性幫助,比如程式碼著色、改進了的命令列回撥、製表符完成、巨集功能以及改進了的互動式幫助。 IPython 7.2.0 帶來了一些小的 bug 修正、改進和新的配
Babel 7.2.0 釋出,JavaScript 編譯器
Babel 7.2.0 已釋出,Babel 是用於編寫下一代 JavaScript 的編譯器。此版本包含對私有例項方法的支援以及一些圍繞 Flow 和 TypeScript 類的 bug 修復。 更新亮點 Private Instance Method
Spark Streaming實時流處理筆記(4)—— 分散式訊息佇列Kafka
1 Kafka概述 和訊息系統類似 1.1 訊息中介軟體 生產者和消費者 1.2 Kafka 架構和概念 producer:生產者(生產饅頭) consumer:消費者(吃饅頭) broker:籃子 topic : 主題,給饅頭帶一個標籤,(