1. 程式人生 > 實用技巧 >使用Dotnetty解決粘包問題

使用Dotnetty解決粘包問題

一,為什麼TCP會有粘包和拆包的問題

粘包:TCP傳送方傳送多個數據包,接收方收到資料時這幾個資料包粘成了一個包,從接收緩衝區看,後一包資料的頭緊接著前一包資料的尾,接收方必需根據協議將這幾個資料包分離出來才能得到正確的資料。

為什麼會發生粘包,從幾個方面來看:

1,TCP是基於位元組流的,TCP的報文沒有規劃資料長度,傳送端和接收端從快取中取資料,應用程式對於訊息的長度是不可見的,不知道資料流應該從什麼地方開始,從什麼地方結束。

2,傳送方:TCP預設啟用了Negal演算法,優化資料流的傳送效率,Nagle演算法主要做兩件事:1)只有上一個分組得到確認,才會傳送下一個分組;2)收集多個小分組,在一個確認到來時一起傳送。

3,接收方:TCP將收到的分組儲存至接收快取裡,然後應用程式主動從快取裡讀收。這樣一來,如果TCP接收的速度大於應用程式讀的速度,多個包就會被存至快取,應用程式讀時,就會讀到多個首尾相接粘到一起的包。

二,Dotnetty專案

Dotnetty監聽埠,啟用管理處理器

public class NettyServer
    {
        public async System.Threading.Tasks.Task RunAsync(int[] port)
        {
            IEventLoopGroup bossEventLoop = new MultithreadEventLoopGroup(port.Length);
            IEventLoopGroup workerLoopGroup = new MultithreadEventLoopGroup();
            try
            {
                ServerBootstrap boot = new ServerBootstrap();
                boot.Group(bossEventLoop, workerLoopGroup)
                    .Channel<TcpServerSocketChannel>()
                    .Option(ChannelOption.SoBacklog, 100)
                    .ChildOption(ChannelOption.SoKeepalive, true)
                    .Handler(new LoggingHandler("netty server"))
                    .ChildHandler(new ActionChannelInitializer<IChannel>(channel => {
                        IPEndPoint ip = (IPEndPoint)channel.LocalAddress;
                        Console.WriteLine(ip.Port);
                        channel.Pipeline.AddLast(new NettyServerHandler());
                    }));
                List<IChannel> list = new List<IChannel>();
                foreach(var item in port)
                {
                    IChannel boundChannel = await boot.BindAsync(item);
                    list.Add(boundChannel);
                }
                Console.WriteLine("按任意鍵退出");
                Console.ReadLine();
                list.ForEach(r =>
                {
                    r.CloseAsync();
                });
                //await boundChannel.CloseAsync();
            }
            catch(Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
            finally
            {
                await bossEventLoop.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
                await workerLoopGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
            }
        }
    }

 管理處理器,簡單列印一下收到的內容

 public class NettyServerHandler: ChannelHandlerAdapter
    {
        public override void ChannelRead(IChannelHandlerContext context, object message)
        {
            if (message is IByteBuffer buffer)
            {
                StringBuilder sb = new StringBuilder();
                while (buffer.IsReadable())
                {
                    sb.Append(buffer.ReadByte().ToString("X2"));
                }
                Console.WriteLine($"RECIVED FROM CLIENT : {sb.ToString()}");
            }
        }
        public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
        {
            Console.WriteLine("Exception: " + exception);
            context.CloseAsync();
        }
    }

  入口方法啟動服務,監聽四個埠

static void Main(string[] args)
        {
            new NettyServer().RunAsync(new int[] { 9001 ,9002,9003,9004}).Wait();
        }

二,在應用層解決TCP的粘包問題

要在應用層解決粘包問題,只需要讓程式知道資料流什麼時候開始,什麼時候結束。按常用協議規劃,可以有以下四種方案:

1,定長協議解碼。每次傳送的資料包長度為固定的,這種協議最簡單,但沒有靈活性。

粘包處理:根據固定的長度處理。如協議長度為4,傳送方傳送資料包 FF 00 00 FF 00 FF 00 AA會被解析成二包:FF 00 00 FF及 FF 00 00 AA

拆包處理:根據固定的長度處理。如協議長度為4,傳送方傳送二個數據包 FF 00, 00 FF 00 FF 00 AA會被解析成二包:FF 00 00 FF及 FF 00 00 AA

Dotnetty實現:監聽9001埠,處理室長協議資料包

新建一個管理處理器,當快取中的可讀長度達到4,處理資料包,管道向下執行,否則不處理。

  public class KsLengthfixedHandler : ByteToMessageDecoder
    {
        protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
        {
            if (input.ReadableBytes < 12) {
                return; // (3)
            }
            output.Add(input.ReadBytes(12)); // (4)
        }
    }

在啟動伺服器是根據埠號加入不同的管道處理器

.ChildHandler(new ActionChannelInitializer<IChannel>(channel => {
                        IPEndPoint ip = (IPEndPoint)channel.LocalAddress;
                        Console.WriteLine(ip.Port);
                        if (ip.Port == 9001)
                        {
                            channel.Pipeline.AddLast(new KsLengthfixedHandler());
                        }
                        channel.Pipeline.AddLast(new NettyServerHandler());
}

2,行解碼,即每個資料包的結尾都是/r/n或者/n(換成16進製為0d0a)。  

粘包處理:根據換行符做分隔處理。如傳送方傳送一包資料 FF 00 00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA

拆包處理:根據換行符做分隔處理。如傳送方傳送二包資料 FF 00以及00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA

Dotnetty實現:監聽9002埠,處理行結尾協議資料包

.ChildHandler(new ActionChannelInitializer<IChannel>(channel => {
                        IPEndPoint ip = (IPEndPoint)channel.LocalAddress;
                        Console.WriteLine(ip.Port);
                        if (ip.Port == 9001)
                        {
                            channel.Pipeline.AddLast(new KsLengthfixedHandler());
                        } 
                        else if (ip.Port == 9002)
                        {
                            channel.Pipeline.AddLast(new LineBasedFrameDecoder(
                                maxLength: 1024, //可接收資料包最大長度
                                stripDelimiter: true, //解碼後的資料包是否去掉分隔符
                                failFast: false //是否讀取超過最大長度的資料包內容
                                ));
                        }
                        channel.Pipeline.AddLast(new NettyServerHandler());
}

3,特定的分隔符解碼。與行解碼規定了以換行符做分隔符不同,這個解碼方案可以自己定義分隔符。

粘包處理:根據自定義的分隔符做分隔處理。如傳送方傳送一包資料 FF 00 00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA

拆包處理:根據自定義的分隔符做分隔處理。如傳送方傳送二包資料 FF 00以及00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA

下面的例項是自定義了一個以“}”為分隔符的解碼器

Dotnetty實現:監聽9003埠,處理自定義分隔符協議資料包

.ChildHandler(new ActionChannelInitializer<IChannel>(channel => {
                        IPEndPoint ip = (IPEndPoint)channel.LocalAddress;
                        Console.WriteLine(ip.Port);
                        if (ip.Port == 9001)
                        {
                            channel.Pipeline.AddLast(new KsLengthfixedHandler());
                        } 
                        else if (ip.Port == 9002)
                        {
                            channel.Pipeline.AddLast(new LineBasedFrameDecoder(
                                maxLength: 1024, //可接收資料包最大長度
                                stripDelimiter: true, //解碼後的資料包是否去掉分隔符
                                failFast: false //是否讀取超過最大長度的資料包內容
                                ));
                        }
                        else if (ip.Port == 9003)
                        {
                            IByteBuffer delimiter = Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes("}"));
                            channel.Pipeline.AddLast(new DotNetty.Codecs.DelimiterBasedFrameDecoder(
                               maxFrameLength: 1024,
                               stripDelimiter: true,
                               failFast: false, 
                              delimiter:  delimiter));
                        }
                        channel.Pipeline.AddLast(new NettyServerHandler());
}

4,指定長度標識。與第一種方案的固定長度不同,這種方案可以指定一個位置存放該資料包的長度,以便程式推算出開始及結束位置。Modbus協議是典型的變長協議。

Dotnetty中使用LengthFieldBasedFrameDecoder解碼器對變長協議解析,瞭解下以下幾個引數:

  * +------+--------+------+----------------+
  * | HDR1 | Length | HDR2 | Actual Content |
  * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |
  * +------+--------+------+----------------+   

1,maxFrameLength:資料包最大長度

2,lengthFieldOffset:長度標識的偏移量。如上面的協議,lengthFieldOffset為1(HDR1的長度)

3,lengthFieldLength:長度標識位的長度。如上面的協議,lengthFieldLength為2(Length的長度)

4,lengthAdjustment:調整長度。如上面的協議,0x000c轉為10進製為12,只標識了Content的長度,並不包括HDR2的長度,在解析時就要設定該默值為HDR2的長度。

5,initialBytesToStrip:從何處開始剝離。如上面的協議,如果想要的解析結果為:0xFE "HELLO, WORLD" 則將initialBytesToStrip設定為3(HDR1的長度+Length的長度)。

Dotnetty實現:監聽9004埠處理變長協議

  .ChildHandler(new ActionChannelInitializer<IChannel>(channel => {
                        IPEndPoint ip = (IPEndPoint)channel.LocalAddress;
                        Console.WriteLine(ip.Port);
                        if (ip.Port == 9001)
                        {
                            channel.Pipeline.AddLast(new KsLengthfixedHandler());
                        } 
                        else if (ip.Port == 9002)
                        {
                            channel.Pipeline.AddLast(new LineBasedFrameDecoder(
                                maxLength: 1024, //可接收資料包最大長度
                                stripDelimiter: true, //解碼後的資料包是否去掉分隔符
                                failFast: false //是否讀取超過最大長度的資料包內容
                                ));
                        }
                        else if (ip.Port == 9003)
                        {
                            IByteBuffer delimiter = Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes("}"));
                            channel.Pipeline.AddLast(new DotNetty.Codecs.DelimiterBasedFrameDecoder(
                               maxFrameLength: 1024,
                               stripDelimiter: true,
                               failFast: false, 
                              delimiter:  delimiter));
                        }
                        else if (ip.Port == 9004)
                        {
                            channel.Pipeline.AddLast(new LengthFieldBasedFrameDecoder(
                               maxFrameLength: 1024,
                               lengthFieldOffset: 1, 
                               lengthFieldLength: 2));
                        }
                        channel.Pipeline.AddLast(new NettyServerHandler());
                    }));