Lind.DDD.LindMQ~關於持久化到Redis的訊息格式
關於持久化到Redis的訊息格式,主要是說在Broker上把訊息持久化的過程中,需要儲存哪些型別的訊息,因為我們的訊息是分topic的,而每個topic又有若干個queue組成,而我們的topic和queue由於redis儲存結構的原因,我們需要將它們分割槽對應儲存一下,而不能像關係型資料庫那樣靈活,所以要額外設計幾個資料結構來儲存它們。
一 Topic字典
二 Topic對應的Queue字典
三 Queue裡的訊息
四 某個客戶端對應某個Queue的消費進度
以上四個結構是我們要說的,它們會在推訊息,拉訊息,刪訊息時用到,下面一一介紹一下,講的不好不對的地方,歡迎大家為大叔留言。
一 Topic字典
主要儲存每個topic,它是一個set集合,redis的我集合型別之一,每個key是唯一的LindMq_Topic,值value就是我們客戶端傳來的具體topic的名字,這主要是在刪除過期的訊息時用的,主是作用是遍歷所有的topic訊息型別,這樣我們在刪除訊息時,就可以把所有註冊的topic都找到了,最後把過期的刪除,預設訊息存活週期是一天。
刪除過期的訊息程式碼如下
var topicList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQ_TOPICKEY);foreach (var topic in topicList) { var queueList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQKEY + topic); foreach (var queue in queueList) { var removeKey = LINDMQKEY + queue + "_" + DateTime.Now.AddDays(-1).ToString("yyyyMMdd"); RedisClient.RedisManager.Instance.GetDatabase().KeyDelete(removeKey); } }
二 Topic對應的Queue字典
我們知道,為了加大redis的併發量和吞吐量,我們會把大資料鍵值對設計成多個鍵,這就像是一個叢集環境的sharing,就是將大資料進行分片,而我們的分片規則是採用按物件取模的方式,模數可以自己設定,比較我設定8,那說明我的佇列(分片)最多可以被分為8個,這個大家可以去做測試,挺有意思的,比隨機數來個直接!而這一次redis裡的鍵就是某個topic,而值就是我們的topic加上佇列索引,例如你的topic是zzl,那麼佇列裡的鍵可能就是zzl0,zzl1,zzl2...
三 Queue裡的訊息
我們的生產者將訊息傳送到broker裡,然後於broker將訊息持久化到具體的儲存介質裡,當然這裡我們用的是Redis,在儲存在redis裡時,我們的具體佇列的鍵是有後綴的,這主要用於訊息的回收,因為我們打算1天回收一次訊息,所以我們的訊息字尾是個日期變數,當然精確到天就可以了,它可以是這樣鍵名LindMQ_order_Paid4_20161202,每個佇列都有自己的字尾,我們在清除訊息時也就有了方法了。我們的佇列儲存結構是比較特殊的sortedSet ,就是可排序的集合,它有權重的概念,我們剛好可以使用這個特性來記錄客戶端的消費進度,因為我們的權重值在一個redis鍵/值對裡是唯一的。
下面程式碼選自Push入佇列的程式碼片斷,分享給大家
//儲存當前Topic RedisClient.RedisManager.Instance.GetDatabase().SetAdd(LINDMQ_TOPICKEY, body.Topic); //要儲存到哪個佇列 body.QueueId = Math.Abs(body.Body.GetHashCode() % BrokerManager.CONFIG_QUEUECOUNT); var dataKey = body.Topic + body.QueueId; RedisClient.RedisManager.Instance.GetDatabase().SetAdd(GetRedisKey(body.Topic), dataKey); //記錄偏移 var offset = RedisClient.RedisManager.Instance.GetDatabase().SortedSetLength(GetRedisDataKey(dataKey)); body.QueueOffset = offset + 1; //儲存訊息 RedisClient.RedisManager.Instance.GetDatabase().SortedSetAdd( GetRedisDataKey(dataKey), Utils.SerializeMemoryHelper.SerializeToJson(body), score: body.QueueOffset);
四 某個客戶端對應某個Queue的消費進度
消費進度是一個很麻煩的問題,生產者的訊息是可以被多個消費者消費的,所以不能使用.net那種簡單的Queue機制,出佇列後就消失了,這是不靠譜的,萬一消失失敗了,也會造成訊息的丟失!下面我們主要看一下消費進度的儲存,它是一個Hash集合,其中redis的鍵名是LindMQ_ConsumerOffset,而value是一個hash物件,hash裡的key是當前佇列名+消費者IP地址的hashcode值,hash裡的value是這個消費者(客戶端)的消費進度(Queue裡的權重,Queue的儲存結構是一個sortedSet)。
客戶端消費的測試程式碼
#region Client-LindMQ var consumer = new ConsumerSetting { BrokenName = "test", BrokenAddress = new System.Net.IPEndPoint(IPAddress.Parse("192.168.2.71"), 8406), Callback = new Dictionary<string, Action<MessageBody>>() { {"zzl",(o)=>{ Console.WriteLine(o.ToString()); Thread.Sleep(1000); }}, {"zhz",(o)=>{ Console.WriteLine(o.ToString()); Thread.Sleep(2000); }} } }; var consumerClient = new ConsumerManager(new List<ConsumerSetting> { consumer }); consumerClient.Start(); #endregion
客戶端消費的測試結果
好了,到這裡我們的LindMQ裡資料儲存結構的內容就講完了,主要使用了redis裡的set,sortedSet,hash等資料結構,在設計過程中,使用了分片(Sharing)的概念,當然也是借鑑了mongodb和redis叢集的設計理念,同時借鑑了方雪華老兄的EQueue設計理念,在這裡和他們說一聲:謝謝!
感謝各位對Lind的支援!