BeetleX之TCP服務應用詳解
BeetleX
是.net core
平臺下的一個開源TCP
通訊元件,它不僅使用簡便還提供了出色效能的支援,可以輕易讓你實現上百萬級別RPS吞吐的服務應用。元件所提供的基礎功能也非常完善,可以讓你輕易擴充套件自己的服務應用,以下提元件整合的功能:
-
完善的會話管理機制,可以根據連線狀態和相關日誌
-
專門針對記憶體池實現的非同步流讀寫,支援標準Stream的同並提供高效的效能
-
訊息IO合併,廣播序列化合並等效能強化功能
-
提供簡潔的協議擴充套件規範,輕易實現http,websocket,mqtt等應用通訊協議
-
支援TLS,讓你構建的通訊服務更安全可靠
擴充套件的元件
以下是Beetlex
擴充套件的一些功能元件
- https://github.com/IKende/FastHttpApi
- https://github.com/IKende/Bumblebee
- https://github.com/IKende/BeetleX.Redis
- https://github.com/IKende/XRPC
- https://github.com/IKende/HttpClients
效能
一開始說組可以讓你現上百萬級別RPS吞吐的服務應用其實一點不假,BeetleX
的基礎效能有這樣的支撐能力;雖然元件不能說是.net core
上效能最好的,但在功能和綜合性能上絕對非常出色(詳細可以https://tfb-status.techempower.com/ 檢視測試結果,可惜這個網站提交的.net core
JSON serialization
基礎輸出的一個測試結果(Plaintext
在官方的測試環境一直沒辦法跑起來....)
在測試中元件只落後於aspcore-rhtx
這是紅帽專門針對 .net core編寫的linux網路驅動.
Single query
構建基礎TCP應用
元件在構建TCP服務的時候非常簡單,主要歸功於它提供了完善的Stream
讀寫功能,而這些功能讓你完全不用關心bytes的讀寫。基於Stream
的好處就是可以輕鬆和第三方序列化的元件進行整合。以下是簡單地構建一個Hello
class Program : ServerHandlerBase { private static IServer server; public static void Main(string[] args) { server = SocketFactory.CreateTcpServer<Program>(); //server.Options.DefaultListen.Port =9090; //server.Options.DefaultListen.Host = "127.0.0.1"; server.Open(); Console.Read(); } public override void SessionReceive(IServer server, SessionReceiveEventArgs e) { var pipeStream = e.Stream.ToPipeStream(); if (pipeStream.TryReadLine(out string name)) { Console.WriteLine(name); pipeStream.WriteLine("hello " + name); e.Session.Stream.Flush(); } base.SessionReceive(server, e); } }
以上就是一個簡單的TCP
服務,讓以程式碼正常執行需要引用Beetlex
最新版的元件可以在Nuget上找到。以上服務的功能很簡單當接收資料後嘗試從流中讀取一行字元,如果讀取成功則把內容寫入到流中提交返回。通過以上程式碼是不是感覺寫個服務比較簡單(但是PipeStream
並不是執行緒安全的,所以不能涉及到多執行緒讀寫它)
協議處理規則
其實PipeStream
處理資料已經非常方便了,那為什麼還需要制定一個協議處理規範呢?前面已經說了PipeStream
並不是執行緒安全的,很容易帶來使用上的風險,所以引入協議處理規則來進行一個安全約束的同時可以實現多執行緒訊息處理。元件提供了這樣一個介面來規範訊息的處理,介面如下:
public interface IPacket : IDisposable { EventHandler<EventArgs.PacketDecodeCompletedEventArgs> Completed { get; set; } IPacket Clone(); void Decode(ISession session, System.IO.Stream stream); void Encode(object data, ISession session, System.IO.Stream stream); byte[] Encode(object data, IServer server); ArraySegment<byte> Encode(object data, IServer server, byte[] buffer); }
如果你要處理訊息物件,則需要實現以上介面(當然這個介面的實現不是必須的,只要把握好PipeStream
安全上的控制就好);但實現這介面來處理訊息可以帶很多好處,可以多訊息合併IO,廣播訊息合併序列化等高效的功能。不過在不瞭解元件的情況實現這個介面的確也是有些難度的,所以元件提供了一個基礎的類FixedHeaderPacket
,它是一個抽像類用於描述有個訊息頭長的資訊流處理。
字元訊息分包
接下來通過FixedHeaderPacket
來實現一個簡單的字元分包協議訊息;主要在傳送訊息的時候新增一個大小頭用來描述訊息的長度(這是在TCP中解決粘包的主要手段)。
public class StringPacket : BeetleX.Packets.FixedHeaderPacket { public override IPacket Clone() { return new StringPacket(); } protected override object OnReader(ISession session, PipeStream stream) { return stream.ReadString(CurrentSize); } protected override void OnWrite(ISession session, object data, PipeStream stream) { stream.Write((string)data); } }
通過FixedHeaderPacket
制定一個分包規則是非常簡單的,主要實現讀寫兩個方法。下面即可在服務中引用這個包作為TCP
資料流的分析規則:
class Program : ServerHandlerBase { private static IServer server; public static void Main(string[] args) { server = SocketFactory.CreateTcpServer<Program,StringPacket>(); //server.Options.DefaultListen.Port =9090; //server.Options.DefaultListen.Host = "127.0.0.1"; server.Open(); Console.Read(); } protected override void OnReceiveMessage(IServer server, ISession session, object message) { Console.WriteLine(message); server.Send($"hello {message}", session); } }
經過分析器包裝後,就再也不用流來處理資料了,可以直接進行對像的傳送處理。
整合Protobuf
處理String
並不是友好的事情,畢竟沒有物件來得直觀和操作方便;以下是通過FixedHeaderPacket
擴充套件Protobuf
物件傳輸,以下是針對Protobuf
的規則擴充套件:
public class ProtobufPacket : BeetleX.Packets.FixedHeaderPacket { static ProtobufPacket() { TypeHeader.Register(typeof(ProtobufClientPacket).Assembly); } public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT); public override IPacket Clone() { return new ProtobufPacket(); } protected override object OnReader(ISession session, PipeStream stream) { Type type = TypeHeader.ReadType(stream); var size = CurrentSize - 4; return ProtoBuf.Meta.RuntimeTypeModel.Default.Deserialize(stream, null, type, size); } protected override void OnWrite(ISession session, object data, PipeStream stream) { TypeHeader.WriteType(data, stream); ProtoBuf.Meta.RuntimeTypeModel.Default.Serialize(stream, data); } }
使用規則分析器
class Program : ServerHandlerBase { private static IServer server; public static void Main(string[] args) { server = SocketFactory.CreateTcpServer<Program, Messages.ProtobufPacket>(); //server.Options.DefaultListen.Port =9090; //server.Options.DefaultListen.Host = "127.0.0.1"; server.Open(); Console.Read(); } protected override void OnReceiveMessage(IServer server, ISession session, object message) { ((Messages.Register)message).DateTime = DateTime.Now; server.Send(message, session); } }
不同序列化的擴充套件
既然有了一個Protobuf
作為樣本,那針對其他序列化的實現就比較簡單了
- json
public class JsonPacket : BeetleX.Packets.FixedHeaderPacket { static JsonPacket() { TypeHeader.Register(typeof(JsonClientPacket).Assembly); } public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT); public override IPacket Clone() { return new JsonPacket(); } protected override object OnReader(ISession session, PipeStream stream) { Type type = TypeHeader.ReadType(stream); var size = CurrentSize - 4; var buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(size); stream.Read(buffer, 0, size); try { return SpanJson.JsonSerializer.NonGeneric.Utf8.Deserialize(new ReadOnlySpan<byte>(buffer, 0, size), type); } finally { System.Buffers.ArrayPool<byte>.Shared.Return(buffer); } } protected override void OnWrite(ISession session, object data, PipeStream stream) { TypeHeader.WriteType(data, stream); var buffer = SpanJson.JsonSerializer.NonGeneric.Utf8.SerializeToArrayPool(data); try { stream.Write(buffer.Array, buffer.Offset, buffer.Count); } finally { System.Buffers.ArrayPool<byte>.Shared.Return(buffer.Array); } } }
- messagepack
public class MsgpackPacket : BeetleX.Packets.FixedHeaderPacket { static MsgpackPacket() { TypeHeader.Register(typeof(MsgpackClientPacket).Assembly); } public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT); public override IPacket Clone() { return new MsgpackPacket(); } protected override object OnReader(ISession session, PipeStream stream) { Type type = TypeHeader.ReadType(stream); var size = CurrentSize - 4; return MessagePackSerializer.NonGeneric.Deserialize(type, stream, true); } protected override void OnWrite(ISession session, object data, PipeStream stream) { TypeHeader.WriteType(data, stream); MessagePackSerializer.NonGeneric.Serialize(data.GetType(), stream, data); } }
更多示例
https://github.com/IKende/BeetleX-Sam