1. 程式人生 > >.net core使用Pipelines進行訊息IO合併

.net core使用Pipelines進行訊息IO合併

之前的文章講述過通過IO合併實現百萬級RPS和千萬級訊息推送,但這兩篇文章只是簡單地講了一下原理和測試結果並沒有在程式碼實現上的講解,這一編文章主要通過程式碼的實現來講述訊息IO合併的原理。其實在早期的版本實現IO合併還是比較因難的,需要大量的程式碼和測試Beetlex是完全自己實現這套機制。不過這一章就不是從Beetlex的實現來講解,因為MS已經提供了一個新東西給以支援,那就是System.IO.Pipelines.在Pipelines的支援下實現訊息Buffer的合併變得非常簡單的事情。

訊息IO合併原理

其實訊息IO合併的原理在這裡再多說一遍,就是多個訊息使用同一個網路IO寫入,其實就是把原來一個訊息對應一個Buffer,設計成多個訊息寫入同一個Buffer.從原理上實現可以看以下圖解。

System.IO.Pipelines介紹

System.IO.Pipelines: High performance IO in .NET, 微軟是這樣說的瞭解詳情 但我瞭解System.IO.Pipelines後發現其實是一個安全可靠的記憶體池讀寫+狀態態通知機制;不過這套機制對普通開發者來說是件非常複雜的工作,主要原因是一但處理不好的情況那就導致記憶體洩露的可能!基於System.IO.Pipelines這套機制,可以非常方便地把訊息和網路buffer分離出來。接下來就講一下使用System.IO.Pipelines實現自動批量把訊息合併到Buffer中。

Pipe類

針對System.IO.Pipelines的介紹說得還是挺神的,其實開啟System.IO.Pipelines一看你就發現就幾抽像類,真正使用的就只有Pipe

一個類.Pipe看上去更像一個Stream提供一個Read和write屬性。Writer屬性是寫入資料,而Reader則是讀取訊息,不過這兩個屬性物件基於狀態互動所以兩者可以分別在不同的執行緒進行處理。

訊息佇列和寫入

前面的原理已經講了,如果想訊息能合併那就需要一個佇列,然後確保同一時間只有一個執行緒來處理佇列中的訊息。如果當前執行緒檢測到佇列中有多個訊息那就可以獲取所有訊息進行一個批序列化,接下來看一下這程式碼程式碼是怎樣實現的.

        private async void OnMergeWrite(object state)
        {
            
while (true) { var memory = mWrite.GetMemory(2048); var length = memory.Length; int offset = 0; int count = 0; while (_msgQueues.TryDequeue(out string msg)) { if (length < msg.Length) { mWrite.Advance(count); memory = mWrite.GetMemory(2048); length = memory.Length; offset = 0; count = 0; } var elen = System.Text.Encoding.ASCII.GetBytes(msg, memory.Slice(offset, msg.Length).Span); count += elen; offset += elen; length -= elen; } if (count > 0) mWrite.Advance(count); await mWrite.FlushAsync(); lock (_workSync) { if (_msgQueues.IsEmpty) { _doingWork = false; return; } } } }

程式碼並不複雜,進入執行緒不斷地獲取訊息並序列化到Buffer中,當Buffer滿了後提交給Writer後重新獲取Buffer繼續序列化。當沒有訊息的時候再一次檢測佇列如果又存在訊息則繼續,為什麼需要兩層While來檢測呢,主要是和佇列寫入狀態檢測的一致性判斷。

      public void Enqueue(string message)
        {
            _msgQueues.Enqueue(message);
            lock (_workSync)
            {
                if (!_doingWork)
                {
                    System.Threading.ThreadPool.UnsafeQueueUserWorkItem(OnMergeWrite, this);
                    _doingWork = true;
                }
            }
        }

以上是訊息寫入佇列方法。

Pipe資料讀取

由於Pipe的Write和Read是基於狀態同步,所以Reader可以在任何意時間和任意執行緒中進行讀取,以下是Read的程式碼:

        private async static void Read(object state)
        {
            int count = 0;
            while (true)
            {
                var result = await pipe.Reader.ReadAsync();
                var buffer = result.Buffer;
                var end = buffer.End;
                if (buffer.IsSingleSegment)
                {
                    Console.WriteLine(System.Text.Encoding.ASCII.GetString(buffer.First.Span));
                    // SAEA.Memory=buffer;
                }
                else
                {
                    foreach (var b in buffer)
                    {
                        Console.WriteLine(System.Text.Encoding.ASCII.GetString(b.Span));
                    }
                    //SAEA.BufferList=buffer;
                }
                pipe.Reader.AdvanceTo(end);
                count++;
                Console.WriteLine(count);
            }
        }

測試

程式碼寫完了,接下來的工作就是通過測試看一下是不是達到合併的效果,以下開啟兩個執行緒分別連續寫入1000個訊息。

        static void Main(string[] args)
        {
            pipe = new Pipe();
            messageQueue = new MessageQueue(pipe.Writer);
            System.Threading.ThreadPool.QueueUserWorkItem(Read);
            System.Threading.ThreadPool.QueueUserWorkItem(Write, "AAAA");
            System.Threading.ThreadPool.QueueUserWorkItem(Write, "BBBB");
            Console.Read();
        }
        private static void Write(object state)
        {
            string name = (string)state;
            for (int i = 0; i < 1000; i++)
            {
                messageQueue.Enqueue($"[{name + i}]");
            }
        }

實際執行效果:

總結

通過以上示例相信大家對System.IO.Pipelines來對訊息進行Buffer合併有一個很好的理解,不過實際情況處理的是物件訊息則相對複雜一些,畢竟訊息的大小是不可知的,不過可以針對最大訊息長度來分析Buffer,確保一個Buffer能夠序列化一個或多個訊息即可。如果你想拋開System.IO.Pipelines更深入地瞭解實現原因可以檢視Beetlex的原始碼,具體位置在:PipeStream