使用Dotnetty解決粘包問題
一,為什麼TCP會有粘包和拆包的問題
粘包:TCP傳送方傳送多個數據包,接收方收到資料時這幾個資料包粘成了一個包,從接收緩衝區看,後一包資料的頭緊接著前一包資料的尾,接收方必需根據協議將這幾個資料包分離出來才能得到正確的資料。
為什麼會發生粘包,從幾個方面來看:
1,TCP是基於位元組流的,TCP的報文沒有規劃資料長度,傳送端和接收端從快取中取資料,應用程式對於訊息的長度是不可見的,不知道資料流應該從什麼地方開始,從什麼地方結束。
2,傳送方:TCP預設啟用了Negal演算法,優化資料流的傳送效率,Nagle演算法主要做兩件事:1)只有上一個分組得到確認,才會傳送下一個分組;2)收集多個小分組,在一個確認到來時一起傳送。
3,接收方:TCP將收到的分組儲存至接收快取裡,然後應用程式主動從快取裡讀收。這樣一來,如果TCP接收的速度大於應用程式讀的速度,多個包就會被存至快取,應用程式讀時,就會讀到多個首尾相接粘到一起的包。
二,Dotnetty專案
Dotnetty監聽埠,啟用管理處理器
public class NettyServer { public async System.Threading.Tasks.Task RunAsync(int[] port) { IEventLoopGroup bossEventLoop = new MultithreadEventLoopGroup(port.Length); IEventLoopGroup workerLoopGroup = new MultithreadEventLoopGroup(); try { ServerBootstrap boot = new ServerBootstrap(); boot.Group(bossEventLoop, workerLoopGroup) .Channel<TcpServerSocketChannel>() .Option(ChannelOption.SoBacklog, 100) .ChildOption(ChannelOption.SoKeepalive, true) .Handler(new LoggingHandler("netty server")) .ChildHandler(new ActionChannelInitializer<IChannel>(channel => { IPEndPoint ip = (IPEndPoint)channel.LocalAddress; Console.WriteLine(ip.Port); channel.Pipeline.AddLast(new NettyServerHandler()); })); List<IChannel> list = new List<IChannel>(); foreach(var item in port) { IChannel boundChannel = await boot.BindAsync(item); list.Add(boundChannel); } Console.WriteLine("按任意鍵退出"); Console.ReadLine(); list.ForEach(r => { r.CloseAsync(); }); //await boundChannel.CloseAsync(); } catch(Exception ex) { Console.WriteLine(ex.Message); } finally { await bossEventLoop.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)); await workerLoopGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)); } } }
管理處理器,簡單列印一下收到的內容
public class NettyServerHandler: ChannelHandlerAdapter { public override void ChannelRead(IChannelHandlerContext context, object message) { if (message is IByteBuffer buffer) { StringBuilder sb = new StringBuilder(); while (buffer.IsReadable()) { sb.Append(buffer.ReadByte().ToString("X2")); } Console.WriteLine($"RECIVED FROM CLIENT : {sb.ToString()}"); } } public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { Console.WriteLine("Exception: " + exception); context.CloseAsync(); } }
入口方法啟動服務,監聽四個埠
static void Main(string[] args) { new NettyServer().RunAsync(new int[] { 9001 ,9002,9003,9004}).Wait(); }
二,在應用層解決TCP的粘包問題
要在應用層解決粘包問題,只需要讓程式知道資料流什麼時候開始,什麼時候結束。按常用協議規劃,可以有以下四種方案:
1,定長協議解碼。每次傳送的資料包長度為固定的,這種協議最簡單,但沒有靈活性。
粘包處理:根據固定的長度處理。如協議長度為4,傳送方傳送資料包 FF 00 00 FF 00 FF 00 AA會被解析成二包:FF 00 00 FF及 FF 00 00 AA
拆包處理:根據固定的長度處理。如協議長度為4,傳送方傳送二個數據包 FF 00, 00 FF 00 FF 00 AA會被解析成二包:FF 00 00 FF及 FF 00 00 AA
Dotnetty實現:監聽9001埠,處理室長協議資料包
新建一個管理處理器,當快取中的可讀長度達到4,處理資料包,管道向下執行,否則不處理。
public class KsLengthfixedHandler : ByteToMessageDecoder { protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) { if (input.ReadableBytes < 12) { return; // (3) } output.Add(input.ReadBytes(12)); // (4) } }
在啟動伺服器是根據埠號加入不同的管道處理器
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => { IPEndPoint ip = (IPEndPoint)channel.LocalAddress; Console.WriteLine(ip.Port); if (ip.Port == 9001) { channel.Pipeline.AddLast(new KsLengthfixedHandler()); }
channel.Pipeline.AddLast(new NettyServerHandler());
}
2,行解碼,即每個資料包的結尾都是/r/n或者/n(換成16進製為0d0a)。
粘包處理:根據換行符做分隔處理。如傳送方傳送一包資料 FF 00 00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA
拆包處理:根據換行符做分隔處理。如傳送方傳送二包資料 FF 00以及00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA
Dotnetty實現:監聽9002埠,處理行結尾協議資料包
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => { IPEndPoint ip = (IPEndPoint)channel.LocalAddress; Console.WriteLine(ip.Port); if (ip.Port == 9001) { channel.Pipeline.AddLast(new KsLengthfixedHandler()); } else if (ip.Port == 9002) { channel.Pipeline.AddLast(new LineBasedFrameDecoder( maxLength: 1024, //可接收資料包最大長度 stripDelimiter: true, //解碼後的資料包是否去掉分隔符 failFast: false //是否讀取超過最大長度的資料包內容 )); }
channel.Pipeline.AddLast(new NettyServerHandler());
}
3,特定的分隔符解碼。與行解碼規定了以換行符做分隔符不同,這個解碼方案可以自己定義分隔符。
粘包處理:根據自定義的分隔符做分隔處理。如傳送方傳送一包資料 FF 00 00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA
拆包處理:根據自定義的分隔符做分隔處理。如傳送方傳送二包資料 FF 00以及00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA
下面的例項是自定義了一個以“}”為分隔符的解碼器
Dotnetty實現:監聽9003埠,處理自定義分隔符協議資料包
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => { IPEndPoint ip = (IPEndPoint)channel.LocalAddress; Console.WriteLine(ip.Port); if (ip.Port == 9001) { channel.Pipeline.AddLast(new KsLengthfixedHandler()); } else if (ip.Port == 9002) { channel.Pipeline.AddLast(new LineBasedFrameDecoder( maxLength: 1024, //可接收資料包最大長度 stripDelimiter: true, //解碼後的資料包是否去掉分隔符 failFast: false //是否讀取超過最大長度的資料包內容 )); } else if (ip.Port == 9003) { IByteBuffer delimiter = Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes("}")); channel.Pipeline.AddLast(new DotNetty.Codecs.DelimiterBasedFrameDecoder( maxFrameLength: 1024, stripDelimiter: true, failFast: false, delimiter: delimiter)); }
channel.Pipeline.AddLast(new NettyServerHandler());
}
4,指定長度標識。與第一種方案的固定長度不同,這種方案可以指定一個位置存放該資料包的長度,以便程式推算出開始及結束位置。Modbus協議是典型的變長協議。
Dotnetty中使用LengthFieldBasedFrameDecoder解碼器對變長協議解析,瞭解下以下幾個引數:
* +------+--------+------+----------------+ * | HDR1 | Length | HDR2 | Actual Content | * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | * +------+--------+------+----------------+
1,maxFrameLength:資料包最大長度
2,lengthFieldOffset:長度標識的偏移量。如上面的協議,lengthFieldOffset為1(HDR1的長度)
3,lengthFieldLength:長度標識位的長度。如上面的協議,lengthFieldLength為2(Length的長度)
4,lengthAdjustment:調整長度。如上面的協議,0x000c轉為10進製為12,只標識了Content的長度,並不包括HDR2的長度,在解析時就要設定該默值為HDR2的長度。
5,initialBytesToStrip:從何處開始剝離。如上面的協議,如果想要的解析結果為:0xFE "HELLO, WORLD" 則將initialBytesToStrip設定為3(HDR1的長度+Length的長度)。
Dotnetty實現:監聽9004埠處理變長協議
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => { IPEndPoint ip = (IPEndPoint)channel.LocalAddress; Console.WriteLine(ip.Port); if (ip.Port == 9001) { channel.Pipeline.AddLast(new KsLengthfixedHandler()); } else if (ip.Port == 9002) { channel.Pipeline.AddLast(new LineBasedFrameDecoder( maxLength: 1024, //可接收資料包最大長度 stripDelimiter: true, //解碼後的資料包是否去掉分隔符 failFast: false //是否讀取超過最大長度的資料包內容 )); } else if (ip.Port == 9003) { IByteBuffer delimiter = Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes("}")); channel.Pipeline.AddLast(new DotNetty.Codecs.DelimiterBasedFrameDecoder( maxFrameLength: 1024, stripDelimiter: true, failFast: false, delimiter: delimiter)); } else if (ip.Port == 9004) { channel.Pipeline.AddLast(new LengthFieldBasedFrameDecoder( maxFrameLength: 1024, lengthFieldOffset: 1, lengthFieldLength: 2)); } channel.Pipeline.AddLast(new NettyServerHandler()); }));