System.IO.Pipelines來對消息進行Buffer合並
System.IO.Pipelines來對消息進行Buffer合並
https://www.cnblogs.com/smark/p/9927455.html
.net core使用Pipelines進行消息IO合並
之前的文章講述過通過IO合並實現百萬級RPS和千萬級消息推送,但這兩篇文章只是簡單地講了一下原理和測試結果並沒有在代碼實現上的講解,這一編文章主要通過代碼的實現來講述消息IO合並的原理。其實在早期的版本實現IO合並還是比較因難的,需要大量的代碼和測試Beetlex是完全自己實現這套機制。不過這一章就不是從Beetlex的實現來講解,因為MS已經提供了一個新東西給以支持,那就是System.IO.Pipelines.在Pipelines的支持下實現消息Buffer的合並變得非常簡單的事情。
消息IO合並原理
其實消息IO合並的原理在這裏再多說一遍,就是多個消息使用同一個網絡IO寫入,其實就是把原來一個消息對應一個Buffer,設計成多個消息寫入同一個Buffer.從原理上實現可以看以下圖解。
System.IO.Pipelines介紹
System.IO.Pipelines: High performance IO in .NET, 微軟是這樣說的了解詳情 但我了解System.IO.Pipelines後發現其實是一個安全可靠的內存池讀寫+狀態態通知機制;不過這套機制對普通開發者來說是件非常復雜的工作,主要原因是一但處理不好的情況那就導致內存泄露的可能!基於System.IO.Pipelines這套機制,可以非常方便地把消息和網絡buffer分離出來。接下來就講一下使用System.IO.Pipelines實現自動批量把消息合並到Buffer中。
Pipe類
針對System.IO.Pipelines的介紹說得還是挺神的,其實打開System.IO.Pipelines一看你就發現就幾抽像類,真正使用的就只有Pipe一個類.Pipe看上去更像一個Stream提供一個Read和write屬性。Writer屬性是寫入數據,而Reader則是讀取消息,不過這兩個屬性對象基於狀態交互所以兩者可以分別在不同的線程進行處理。
消息隊列和寫入
前面的原理已經講了,如果想消息能合並那就需要一個隊列,然後確保同一時間只有一個線程來處理隊列中的消息。如果當前線程檢測到隊列中有多個消息那就可以獲取所有消息進行一個批序列化,接下來看一下這代碼代碼是怎樣實現的.
復制代碼
private async void OnMergeWrite(object state)
{
while (true)
{
var memory = mWrite.GetMemory(2048);
var length = memory.Length;
int offset = 0;
int count = 0;
while (_msgQueues.TryDequeue(out string msg))
{
if (length < msg.Length)
{
mWrite.Advance(count);
memory = mWrite.GetMemory(2048);
length = memory.Length;
offset = 0;
count = 0;
}
var elen = System.Text.Encoding.ASCII.GetBytes(msg, memory.Slice(offset, msg.Length).Span);
count += elen;
offset += elen;
length -= elen;
}
if (count > 0)
mWrite.Advance(count);
await mWrite.FlushAsync();
lock (_workSync)
{
if (_msgQueues.IsEmpty)
{
_doingWork = false;
return;
}
}
}
}
復制代碼
代碼並不復雜,進入線程不斷地獲取消息並序列化到Buffer中,當Buffer滿了後提交給Writer後重新獲取Buffer繼續序列化。當沒有消息的時候再一次檢測隊列如果又存在消息則繼續,為什麽需要兩層While來檢測呢,主要是和隊列寫入狀態檢測的一致性判斷。
復制代碼
public void Enqueue(string message)
{
_msgQueues.Enqueue(message);
lock (_workSync)
{
if (!_doingWork)
{
System.Threading.ThreadPool.UnsafeQueueUserWorkItem(OnMergeWrite, this);
_doingWork = true;
}
}
}
復制代碼
以上是消息寫入隊列方法。
Pipe數據讀取
由於Pipe的Write和Read是基於狀態同步,所以Reader可以在任何意時間和任意線程中進行讀取,以下是Read的代碼:
復制代碼
private async static void Read(object state)
{
int count = 0;
while (true)
{
var result = await pipe.Reader.ReadAsync();
var buffer = result.Buffer;
var end = buffer.End;
if (buffer.IsSingleSegment)
{
Console.WriteLine(System.Text.Encoding.ASCII.GetString(buffer.First.Span));
// SAEA.Memory=buffer;
}
else
{
foreach (var b in buffer)
{
Console.WriteLine(System.Text.Encoding.ASCII.GetString(b.Span));
}
//SAEA.BufferList=buffer;
}
pipe.Reader.AdvanceTo(end);
count++;
Console.WriteLine(count);
}
}
復制代碼
測試
代碼寫完了,接下來的工作就是通過測試看一下是不是達到合並的效果,以下開啟兩個線程分別連續寫入1000個消息。
復制代碼
static void Main(string[] args)
{
pipe = new Pipe();
messageQueue = new MessageQueue(pipe.Writer);
System.Threading.ThreadPool.QueueUserWorkItem(Read);
System.Threading.ThreadPool.QueueUserWorkItem(Write, "AAAA");
System.Threading.ThreadPool.QueueUserWorkItem(Write, "BBBB");
Console.Read();
}
private static void Write(object state)
{
string name = (string)state;
for (int i = 0; i < 1000; i++)
{
messageQueue.Enqueue($"[{name + i}]");
}
}
復制代碼
實際運行效果:
總結
通過以上示例相信大家對System.IO.Pipelines來對消息進行Buffer合並有一個很好的理解,不過實際情況處理的是對象消息則相對復雜一些,畢竟消息的大小是不可知的,不過可以針對最大消息長度來分析Buffer,確保一個Buffer能夠序列化一個或多個消息即可。如果你想拋開System.IO.Pipelines更深入地了解實現原因可以查看Beetlex的源碼,具體位置在:PipeStream
最後奉上以上示例的代碼http://www.ikende.com/Files/SocketIOMerge.zip?tag=manager
System.IO.Pipelines來對消息進行Buffer合並