1. 程式人生 > >使用Newlife網絡庫管道模式解決數據粘包(二)

使用Newlife網絡庫管道模式解決數據粘包(二)

sub throw 數組 服務端 cit 需要 存在 bubuko reverse

上一篇我們講了 如何創建一個基本的Newlife網絡服務端 這邊我們來講一下如何解決粘包的問題

在上一篇總我們註冊了Newlife的管道處理器 ,我們來看看他是如何實現粘包處理的

svr.Add<ReciveFilter>();//粘包處理管道

首先看一下我們設備的上傳數據協議

技術分享圖片

設備上報的數據包頭包含了固定的包頭包尾,整個包的數據長度,設備編號。

包頭:板卡類型,幀類型 2個字節 0x01 0x70

幀長度: 為兩個字節 並且數據的字節序為 高字節在前 ,C#正常默認為低字節在前。

設備號:15位的ASCII 字符串

包尾: 兩個字節 0x0D 0x0A 固定

下面來解決粘包的問題

Newlife網絡庫提供了幾種常見的封包協議來解決粘包的問題,其中有一個 LengthFieldCodec解碼器 這個解碼器以長度字段作為頭部 恰好符合我們的需求,我們就以這個解碼器稍作改造來解決我們的粘包問題吧

由於這個解碼器是適用於 只包含包頭和包體的數據結構,且長度為包體的長度,而我們的協議 是包含包頭包體包尾,並且幀長度為整個包的長度,長度為高位在前的數據結構,所以我們需要對整個解碼器稍微做一些改造來符合我們的數據結構 。

我們來看下代碼 其中

        #region 屬性
        /// <summary>長度所在位置</summary>
public Int32 Offset { get; set; }=2; /// <summary>長度占據字節數,1/2/4個字節,0表示壓縮編碼整數,默認2</summary> public Int32 Size { get; set; } = 2; /// <summary>過期時間,超過該時間後按廢棄數據處理,默認500ms</summary> public Int32 Expire { get; set; } = 500; #endregion

在我們的協議中可以看到 設置了數據包的長度位置,長度占據的字節數,下面我們來獲取一下整個包的長度

/// <summary>從數據流中獲取整幀數據長度</summary>
        /// <param name="pk"></param>
        /// <param name="offset"></param>
        /// <param name="size"></param>
        /// <returns>數據幀長度(包含頭部長度位)</returns>
        protected  Int32 GetLength(Packet pk, Int32 offset, Int32 size)
        {
            if (offset < 0) return pk.Total - pk.Offset;
            // 數據不夠,連長度都讀取不了
            if (offset >= pk.Total) return 0;

            // 讀取大小
            var len = 0;
            switch (size)
            {
                case 2:
                    var lenArry = pk.ReadBytes(offset, 2);
                    //高位在前,反轉數組,獲取長度
                    Array.Reverse(lenArry);
                    len = lenArry.ToUInt16();
                    break;
                default:
                    throw new NotSupportedException();
            }

            // 判斷後續數據是否足夠
            if (len > pk.Total) return 0;

            return len;
        }

獲取長度後我們就可以從數據流中讀取一個完整的包了

        /// <summary>解碼</summary>
        /// <param name="context"></param>
        /// <param name="pk"></param>
        /// <returns></returns>
        protected override IList<Packet> Decode(IHandlerContext context, Packet pk)
        {
            var ss = context.Owner as IExtend;
            var mcp = ss["CodecItem"] as CodecItem;
            if (mcp == null) ss["CodecItem"] = mcp = new CodecItem();

            var pks = ParseNew(pk, mcp, 0, ms => GetLength(ms, Offset, Size), Expire);

            // 跳過頭部長度
            var len = Offset + Math.Abs(Size);
            foreach (var item in pks)
            {
                item.Set(item.Data, item.Offset + len, item.Count - len);
                //item.SetSub(len, item.Count - len);
            }

            return pks;
        }


        #region 粘包處理
        /// <summary>分析數據流,得到一幀數據</summary>
        /// <param name="pk">待分析數據包</param>
        /// <param name="codec">參數</param>
        /// <param name="getLength">獲取長度</param>
        /// <param name="expire">緩存有效期</param>
        /// <returns></returns>
        protected IList<Packet> ParseNew(Packet pk, CodecItem codec, int startIndex, Func<Packet, Int32> getLength, Int32 expire = 5000)
        {
            var _ms = codec.Stream;
            var nodata = _ms == null || _ms.Position < 0 || _ms.Position >= _ms.Length;

            var list = new List<Packet>();
            // 內部緩存沒有數據,直接判斷輸入數據流是否剛好一幀數據,快速處理,絕大多數是這種場景
            if (nodata)
            {
                if (pk == null) return list.ToArray();

                var idx = 0;
                while (idx < pk.Total)
                {
                    //var pk2 = new Packet(pk.Data, pk.Offset + idx, pk.Total - idx);
                    var pk2 = pk.Slice(idx);
                    var len = getLength(pk2);
                    if (len <= 0 || len > pk2.Count) break;

                    pk2.Set(pk2.Data, startIndex, len);
                    //pk2.SetSub(0, len);
                    list.Add(pk2);
                    idx += len;
                }
                // 如果沒有剩余,可以返回
                if (idx == pk.Total) return list.ToArray();

                // 剩下的
                //pk = new Packet(pk.Data, pk.Offset + idx, pk.Total - idx);
                pk = pk.Slice(idx);
            }

            if (_ms == null) codec.Stream = _ms = new MemoryStream();

            // 加鎖,避免多線程沖突
            lock (_ms)
            {
                // 超過該時間後按廢棄數據處理
                var now = TimerX.Now;
                if (_ms.Length > _ms.Position && codec.Last.AddMilliseconds(expire) < now)
                {
                    _ms.SetLength(0);
                    _ms.Position = 0;
                }
                codec.Last = now;

                // 合並數據到最後面
                if (pk != null && pk.Total > 0)
                {
                    var p = _ms.Position;
                    _ms.Position = _ms.Length;
                    pk.WriteTo(_ms);
                    _ms.Position = p;
                }

                // 嘗試解包
                while (_ms.Position < _ms.Length)
                {
                    //var pk2 = new Packet(_ms.GetBuffer(), (Int32)_ms.Position, (Int32)_ms.Length);
                    var pk2 = new Packet(_ms);
                    var len = getLength(pk2);

                    // 資源不足一包
                    if (len <= 0 || len > pk2.Total) break;

                    // 解包成功
                    pk2.Set(pk2.Data, startIndex, len);
                    //pk2.SetSub(0, len);
                    list.Add(pk2);

                    _ms.Seek(len, SeekOrigin.Current);
                }

                // 如果讀完了數據,需要重置緩沖區
                if (_ms.Position >= _ms.Length)
                {
                    _ms.SetLength(0);
                    _ms.Position = 0;
                }

                return list;
            }
        }

粘包處理管道完成後,就可以在Recive中去處理一個完整的數據包啦,我來解析一下這個狀態的數據並且來保存設備連接

首先定義一個字典項用來保存設備的連接信息.設備號,連接的SessionId

   /// <summary>
        /// newLife連接保持 
        /// </summary>
        private Dictionary<string, int> OnLineClients = new Dictionary<string, int>();



由於我們的數據中 幀類型不同的請求中幀類型是不一樣的 所以解析數據需要做區分處理 我們來或者狀態上傳信息中的設備號並且和連接關聯

 private Dictionary<string, int> OnLineClients = new Dictionary<string, int>();
        private object _lock=new object();
        private void Recive(object sender, ReceivedEventArgs e)
        {

            INetSession session = (INetSession)sender;
            var pk = e.Message as Packet;
            if (pk.Count == 0)
            {
                XTrace.WriteLine("數據包解析錯誤");
                return;

            }
            try
            {
                //數據包
                var respBytes = pk.Data;
                //獲取幀類型
                var dataTypeBytes = respBytes[1];

                if (dataTypeBytes == 0x70)
                {
                    //數值
                    byte[] deviceNoByte = new byte[15];
                    Buffer.BlockCopy(respBytes, 4, deviceNoByte, 0, 15); //從緩沖區裏讀取包頭的字節
                    string deviceNo = Encoding.ASCII.GetString(deviceNoByte);
                    XTrace.WriteLine("設備編號:" + deviceNo);
//保存連接信息
SaveClientConnection(deviceNo, session.ID);
//獲取設備號後保存連接信息 } //支付寶 } catch (Exception ex) { XTrace.WriteLine(ex.Message); } } /// <summary> /// 保存在線信息 /// </summary> /// <param name="deviceNo"></param> /// <param name="sessionId"></param> private void SaveClientConnection(string deviceNo, int sessionId) { lock (_lock) { if (OnLineClients.ContainsKey(deviceNo)) { OnLineClients[deviceNo] = sessionId; } else { OnLineClients.Add(deviceNo,sessionId); } } }

好了數據粘包問題解決啦同時保存了設備連接信息,下面來解決如何定時檢查測試在線狀態。

使用Newlife網絡庫管道模式解決數據粘包(二)