1. 程式人生 > >Lind.DDD.LindMQ~關於持久化到Redis的訊息格式

Lind.DDD.LindMQ~關於持久化到Redis的訊息格式

回到目錄

關於持久化到Redis的訊息格式,主要是說在Broker上把訊息持久化的過程中,需要儲存哪些型別的訊息,因為我們的訊息是分topic的,而每個topic又有若干個queue組成,而我們的topic和queue由於redis儲存結構的原因,我們需要將它們分割槽對應儲存一下,而不能像關係型資料庫那樣靈活,所以要額外設計幾個資料結構來儲存它們。

一 Topic字典

二 Topic對應的Queue字典

三 Queue裡的訊息

四 某個客戶端對應某個Queue的消費進度

以上四個結構是我們要說的,它們會在推訊息,拉訊息,刪訊息時用到,下面一一介紹一下,講的不好不對的地方,歡迎大家為大叔留言。

一 Topic字典

主要儲存每個topic,它是一個set集合,redis的我集合型別之一,每個key是唯一的LindMq_Topic,值value就是我們客戶端傳來的具體topic的名字,這主要是在刪除過期的訊息時用的,主是作用是遍歷所有的topic訊息型別,這樣我們在刪除訊息時,就可以把所有註冊的topic都找到了,最後把過期的刪除,預設訊息存活週期是一天。

刪除過期的訊息程式碼如下

 var topicList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQ_TOPICKEY);
  
foreach (var topic in topicList) { var queueList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQKEY + topic); foreach (var queue in queueList) {  var removeKey = LINDMQKEY + queue + "_" + DateTime.Now.AddDays(-1).ToString("yyyyMMdd");   RedisClient.RedisManager.Instance.GetDatabase().KeyDelete(removeKey); } }

二 Topic對應的Queue字典

我們知道,為了加大redis的併發量和吞吐量,我們會把大資料鍵值對設計成多個鍵,這就像是一個叢集環境的sharing,就是將大資料進行分片,而我們的分片規則是採用按物件取模的方式,模數可以自己設定,比較我設定8,那說明我的佇列(分片)最多可以被分為8個,這個大家可以去做測試,挺有意思的,比隨機數來個直接!而這一次redis裡的鍵就是某個topic,而值就是我們的topic加上佇列索引,例如你的topic是zzl,那麼佇列裡的鍵可能就是zzl0,zzl1,zzl2...

三 Queue裡的訊息

我們的生產者將訊息傳送到broker裡,然後於broker將訊息持久化到具體的儲存介質裡,當然這裡我們用的是Redis,在儲存在redis裡時,我們的具體佇列的鍵是有後綴的,這主要用於訊息的回收,因為我們打算1天回收一次訊息,所以我們的訊息字尾是個日期變數,當然精確到天就可以了,它可以是這樣鍵名LindMQ_order_Paid4_20161202,每個佇列都有自己的字尾,我們在清除訊息時也就有了方法了。我們的佇列儲存結構是比較特殊的sortedSet ,就是可排序的集合,它有權重的概念,我們剛好可以使用這個特性來記錄客戶端的消費進度,因為我們的權重值在一個redis鍵/值對裡是唯一的。

下面程式碼選自Push入佇列的程式碼片斷,分享給大家

       //儲存當前Topic
            RedisClient.RedisManager.Instance.GetDatabase().SetAdd(LINDMQ_TOPICKEY, body.Topic);

            //要儲存到哪個佇列
            body.QueueId = Math.Abs(body.Body.GetHashCode() % BrokerManager.CONFIG_QUEUECOUNT);
            var dataKey = body.Topic + body.QueueId;
            RedisClient.RedisManager.Instance.GetDatabase().SetAdd(GetRedisKey(body.Topic), dataKey);

            //記錄偏移
            var offset = RedisClient.RedisManager.Instance.GetDatabase().SortedSetLength(GetRedisDataKey(dataKey));
            body.QueueOffset = offset + 1;

            //儲存訊息
            RedisClient.RedisManager.Instance.GetDatabase().SortedSetAdd(
                GetRedisDataKey(dataKey),
                Utils.SerializeMemoryHelper.SerializeToJson(body),
                score: body.QueueOffset);

四 某個客戶端對應某個Queue的消費進度

消費進度是一個很麻煩的問題,生產者的訊息是可以被多個消費者消費的,所以不能使用.net那種簡單的Queue機制,出佇列後就消失了,這是不靠譜的,萬一消失失敗了,也會造成訊息的丟失!下面我們主要看一下消費進度的儲存,它是一個Hash集合,其中redis的鍵名是LindMQ_ConsumerOffset,而value是一個hash物件,hash裡的key是當前佇列名+消費者IP地址的hashcode值,hash裡的value是這個消費者(客戶端)的消費進度(Queue裡的權重,Queue的儲存結構是一個sortedSet)。

客戶端消費的測試程式碼

            #region Client-LindMQ
            var consumer = new ConsumerSetting
            {
                BrokenName = "test",
                BrokenAddress = new System.Net.IPEndPoint(IPAddress.Parse("192.168.2.71"), 8406),
                Callback = new Dictionary<string, Action<MessageBody>>() { 
                {"zzl",(o)=>{
                    Console.WriteLine(o.ToString());
                    Thread.Sleep(1000);
                }},
                {"zhz",(o)=>{
                    Console.WriteLine(o.ToString());
                    Thread.Sleep(2000);
                }}
                }
            };
            var consumerClient = new ConsumerManager(new List<ConsumerSetting> { consumer });
            consumerClient.Start();
            #endregion

客戶端消費的測試結果

好了,到這裡我們的LindMQ裡資料儲存結構的內容就講完了,主要使用了redis裡的set,sortedSet,hash等資料結構,在設計過程中,使用了分片(Sharing)的概念,當然也是借鑑了mongodb和redis叢集的設計理念,同時借鑑了方雪華老兄的EQueue設計理念,在這裡和他們說一聲:謝謝!

感謝各位對Lind的支援!

回到目錄