動手實現一個較為簡單的MQTT服務端和客戶端
專案地址:https://github.com/hnlyf168/DotNet.Framework
Install-Package DotNetCN -Version 3.0.0
昨天晚上大致測試了下 ,490個客戶端(一個收一個發) 平均估計每個每秒60個包 使用mqtt協議 傳送一個guid的字串 伺服器轉發每秒大約1.2-1.3w
cpu 佔用:25% 一下
記憶體好像都在50m以下
1、協議簡介
MQTT是一個基於客戶端-伺服器的訊息釋出/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用範圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通訊和物聯網(IoT)。其在,通過衛星鏈路通訊感測器、偶爾撥號的醫療裝置、智慧家居、及一些小型化裝置中已廣泛使用。
具體就不在這裡記錄了,寫這個服務端和客戶端也只是為了更加深入的學習mqtt協議。
2、mqtt的幾種控制報文型別
名字 |
值 |
報文流動方向 |
描述 |
Reserved |
0 |
禁止 |
保留 |
CONNECT |
1 |
客戶端到服務端 |
客戶端請求連線服務端 |
CONNACK |
2 |
服務端到客戶端 |
連線報文確認 |
PUBLISH |
3 |
兩個方向都允許 |
釋出訊息 |
PUBACK |
4 |
兩個方向都允許 |
QoS 1訊息釋出收到確認 |
PUBREC |
5 |
兩個方向都允許 |
釋出收到(保證交付第一步) |
PUBREL |
6 |
兩個方向都允許 |
釋出釋放(保證交付第二步) |
PUBCOMP |
7 |
兩個方向都允許 |
QoS 2訊息釋出完成(保證互動第三步) |
SUBSCRIBE |
8 |
客戶端到服務端 |
客戶端訂閱請求 |
SUBACK |
9 |
服務端到客戶端 |
訂閱請求報文確認 |
UNSUBSCRIBE |
10 |
客戶端到服務端 |
客戶端取消訂閱請求 |
UNSUBACK |
11 |
服務端到客戶端 |
取消訂閱報文確認 |
PINGREQ |
12 |
客戶端到服務端 |
心跳請求 |
PINGRESP |
13 |
服務端到客戶端 |
心跳響應 |
DISCONNECT |
14 |
客戶端到服務端 |
客戶端斷開連線 |
Reserved |
15 |
禁止 |
保留 |
2.1、協議頭
每個MQTT控制報文都包含一個固定報頭,
固定報頭的格式
Bit |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
byte 1 |
MQTT控制報文的型別 |
用於指定控制報文型別的標誌位 |
||||||
byte 2... |
|
剩餘長度 |
剩餘長度的計算方式:
剩餘長度(Remaining Length)表示當前報文剩餘部分的位元組數,包括可變報頭和負載的資料。剩餘長度不包括用於編碼剩餘長度欄位本身的位元組數。
剩餘長度欄位使用一個變長度編碼方案,對小於128的值它使用單位元組編碼。更大的值按下面的方式處理。低7位有效位用於編碼資料,最高有效位用於指示是否有更多的位元組。因此每個位元組可以編碼128個數值和一個延續位(continuation bit)。剩餘長度欄位最大4個位元組
例如,十進位制數64會被編碼為一個位元組,數值是64,十六進位制表示為0x40,。十進位制數字
321(=65+2*128)被編碼為兩個位元組,最低有效位在前。第一個位元組是 65+128=193。注意最高位為1表示後面至少還有一個位元組。第二個位元組是2。
.net 計算方式程式碼如下:
/// <summary> /// 獲取一個長度資料 /// </summary> /// <returns></returns> protected virtual Result<int> ReadLength() { var result = this.Socket.ReceiveBytes(1); if (!result.Success) { WriteLog("獲取mqtt 長度失敗"); return new Result<int>(result); } var msgType = result.Data[0]; var msgLength = msgType & 127;//取低7為的值,因為可變長度有效值只有低7位,第8位用來標識下一個位元組是否屬於長度位元組 var leftBit = 7; while (msgType >> 7 == 1)//判斷最高位是否為1,如果為1則說明後面的1個位元組也是屬於長度位元組 { result = this.Socket.ReceiveBytes(1); if (!result.Success) { WriteLog("獲取mqtt 長度失敗"); return new Result<int>(result); } msgType = result.Data[0]; msgLength = ((msgType & 127) << leftBit) | msgLength;// 因為mqtt 可變長度的位元組是低位在前,所以新取到的長度要左移取到的次數*7位在|原來的長度。 leftBit += 7; } return msgLength; }
2.2、CONNECT – 連線服務端
協議格式
可看到所需要的引數,於是定義一個連線資訊類來儲存
/// <summary> /// mqtt 連線資訊。 /// </summary> public class MQTTConnectInfo { /// <summary> /// 客戶端編號 /// </summary> public virtual string ClientId { get; set; } /// <summary> /// 使用者名稱 /// </summary> public virtual string UserName { get; set; } /// <summary> /// 使用者密碼 /// </summary> public virtual string Password { get; set; } /// <summary> /// 遺囑保留 /// </summary> public virtual bool WillRetain { get; set; } /// <summary> /// 遺囑QoS /// </summary> public virtual Qos WillQos { get; set; } /// <summary> /// 遺囑標誌 /// </summary> public virtual bool WillFlag { get; set; } /// <summary> /// 是否清除對話。 /// </summary> public virtual bool CleanSession { get; set; } /// <summary> /// 保持連線 /// <para>警告:這裡的單位是秒</para> /// </summary> public virtual ushort KeepAlive { get; set; } = 10; }View Code
然後就是程式碼按協議格式組裝好
程式碼如下:
/// <summary> /// 獲取包完整的位元組 /// </summary> /// <returns></returns> public override byte[] ToBytes() { var list = new List<byte>(); var mqttBytes = ProtocolName.ToBytes(Encoding.ASCII);//協議名稱:固定位MQTT list.Add((byte)(mqttBytes.Length >> 8)); list.Add((byte)(mqttBytes.Length & 255)); list.AddRange(mqttBytes); list.Add(Version);//協議版本 list.Add(ConnectFlag);//連線標識 list.Add((byte)(KeepAlive >> 8));//心跳值 list.Add((byte)(KeepAlive & 255)); var clientIdBytes = ClientId.ToBytes(Encoding.ASCII);//客戶端編號 list.Add((byte)(clientIdBytes.Length >> 8)); list.Add((byte)(clientIdBytes.Length & 255)); list.AddRange(clientIdBytes); if (HasUserName)//是否包含使用者名稱 { var userNameBytes = UserName.ToBytes(Encoding.ASCII); list.Add((byte)(userNameBytes.Length >> 8)); list.Add((byte)(userNameBytes.Length & 255)); list.AddRange(userNameBytes); } if (HasPassword)//是否包含使用者密碼 { var passwordBytes = Password.ToBytes(Encoding.ASCII); list.Add((byte)(passwordBytes.Length >> 8)); list.Add((byte)(passwordBytes.Length & 255)); list.AddRange(passwordBytes); } Data = list.ToArray(); list.Clear(); return base.ToBytes(); }View Code
連接回復包格式:
|
描述 |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
||||||||
連線確認標誌 |
Reserved 保留位 |
1 SP |
|
|
|
|
|
|
|
||||||||
byte 1 |
|
0 |
0 |
0 |
0 |
0 |
0 |
0 |
X |
||||||||
連線返回碼 |
|
|
|
|
|
|
|
|
|
||||||||
byte 2 |
|
X |
X |
X |
X |
X |
X |
X |
X |
||||||||
|
描述 |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
||||||||
連線確認標誌 |
|
Reserved 保留位 |
|
|
1 SP |
||||||||||||
byte 1 |
|
0 |
0 |
0 |
0 |
0 |
0 |
0 |
X |
||||||||
|
|
連線返回碼 |
|
|
|
||||||||||||
byte 2 |
|
X |
X |
X |
X |
X |
X |
X |
X |
,猶豫程式碼比較簡單這裡就不貼了
2.3、PUBLISH – 釋出訊息
PUBLISH控制報文是指從客戶端向服務端或者服務端向客戶端傳輸一個應用訊息
主題長度 16位 2位元組 |
主題內容N |
如果QoS大於0 則有一個訊息Id 16位 2位元組 |
剩餘的N 是訊息的主題 |
|
描述 |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
Topic Name 主題名 |
|
|
|
|
|
|
|
|
|
byte 1 |
Length MSB (0) |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
byte 2 |
Length LSB (3) |
0 |
0 |
0 |
0 |
0 |
0 |
1 |
1 |
byte 3 |
‘a’ (0x61) |
0 |
1 |
1 |
0 |
0 |
0 |
0 |
1 |
byte 4 |
‘/’ (0x2F) |
0 |
0 |
1 |
0 |
1 |
1 |
1 |
1 |
byte 5 |
‘b’ (0x62) |
0 |
1 |
1 |
0 |
0 |
0 |
1 |
0 |
報文識別符號 |
|
|
|
|
|
|
|
|
|
byte 6 |
報文識別符號 MSB (0) |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
byte 7 |
報文識別符號 LSB (10) |
0 |
0 |
0 |
0 |
1 |
0 |
1 |
0 |
程式碼按協議格式組裝如下:
/// <summary> /// 開始組裝包。 /// </summary> protected override void Packaging() { var topicBytes = Topic.ToBytes();//主題資料 Data = new byte[topicBytes.Length + BodyBytes.Length + (QoS > 0 ? 4 : 2)]; Data[0] = (byte)(topicBytes.Length >> 8); Data[1] = (byte)(topicBytes.Length & 255); topicBytes.CopyTo(Data, 2); if (QoS > 0) { Data[topicBytes.Length + 2] = (byte)(Identifier >> 8); Data[topicBytes.Length + 3] = (byte)(Identifier & 255); } BodyBytes.CopyTo(Data, Data.Length - BodyBytes.Length);//複製訊息內容 topicBytes = null; }
後面的格式我就不在一一寫出來了 ,附上一個mqtt協議文件
MQTT協議中文版
這裡還有一個非常重要的自己寫的一個Socket 輔助類,主要實現高效能的讀取和傳送訊息,已整包的形式,這樣避免粘包等問題
/// <summary> /// Berkeley 套接字 輔助 /// </summary> public abstract class SocketClient<Package> where Package : IDataPackage { private Socket m_Socket; private Timer timerHeartbeat; private System.Net.EndPoint remoteEndPoint; /// <summary> /// 客戶端唯一標識 /// </summary> public virtual long Id { get; set; } /// <summary> /// Berkeley 套接字。 /// </summary> public virtual Socket Socket { get => m_Socket; protected set { m_Socket = value; remoteEndPoint = m_Socket.RemoteEndPoint; } } /// <summary> /// 客戶端的遠端資訊。 /// </summary> public virtual System.Net.EndPoint RemoteEndPoint { get => remoteEndPoint; } /// <summary> /// 心跳執行緒 /// </summary> protected virtual Timer TimerHeartbeat { get => timerHeartbeat; } /// <summary> /// 心跳時間。 /// </summary> public virtual int KeepAlive { get; set; } = 180000; /// <summary> /// 初始化 /// </summary> /// <param name="socket"></param> protected SocketClient(Socket socket) { Socket = socket; } /// <summary> /// 初始化 /// </summary> protected SocketClient() { } /// <summary> /// 讀取一個完整的包。 /// </summary> /// <returns></returns> protected abstract DotNet.Result<Package> ReceivePackage(); /// <summary> /// 開始迴圈讀取訊息。 /// </summary> public virtual void OnReceive() { while (!IsClose) { try { OnHeartbeatTimer(); var bytesResult = ReceivePackage(); if (bytesResult.Success) { OnNewDataPackage(bytesResult); } else { WriteLog($"接收包時錯誤,錯誤內容:{bytesResult.Message}"); if (bytesResult.Code == -1) { this.Close(); } } } catch (Exception ex) { WriteErrorLog($"接收包時異常", ex); } } Close(); } /// <summary> /// 當接收到 /// </summary> /// <param name="bytesResult"></param> protected virtual void OnNewDataPackage(Result<Package> bytesResult) { try { // 這裡使用非同步會有一個問題,就是如果一個客戶端while(true)在發訊息,會導致伺服器執行緒被一個客戶端佔滿而無法處理其他的客戶端。 OnHandleDataPackage(bytesResult.Data); } catch (Exception ex) { WriteErrorLog($"客戶端處理包時報錯", ex); } } #if NET40 /// <summary> /// 啟用非同步讀取 /// </summary> /// <returns></returns> public virtual Task OnReceiveAsync() { return Task.Factory.StartNew(OnReceive); } #else /// <summary> /// 啟用非同步讀取 /// </summary> /// <returns></returns> public virtual async Task OnReceiveAsync() { await Task.Run(() => { OnReceive(); }); } #endif private bool m_IsClose; /// <summary> /// 是否已經關閉 /// </summary> public virtual bool IsClose => m_IsClose; /// <summary> /// 關閉連線,並退出當前執行緒 /// </summary> public virtual void Close(int timeout = 3) { lock (this) { if (!IsClose) { m_IsClose = true; WriteLog($"關閉連線"); OnClose(); //真正關閉,避免二次關閉 } } Socket?.Close(timeout); Socket?.Dispose(); timerHeartbeat?.Dispose(); } /// <summary> /// 關閉連線並退出。 /// </summary> protected abstract void OnClose(); /// <summary> /// 設定心跳計數器 /// </summary> protected virtual void OnHeartbeatTimer() { if (timerHeartbeat == null) { timerHeartbeat = new Timer(OnHeartbeatTimerCallback, this, KeepAlive, KeepAlive); } else { timerHeartbeat.Change(KeepAlive, KeepAlive); } } /// <summary> /// 心跳實際到達後觸發,改方法又心跳計數器執行。 /// </summary> /// <param name="state"></param> protected virtual void OnHeartbeatTimerCallback(object state) { WriteLog($"客戶端{KeepAlive}s未發包,已丟棄"); Close(); } /// <summary> /// 寫入日誌。 /// </summary> /// <param name="text">日誌內容</param> public virtual void WriteLog(string text) { // Log.WriteLog($" 連線{RemoteEndPoint}-{text}"); } /// <summary> /// 寫入錯誤資訊到日誌。 /// </summary> /// <param name="text">錯誤資訊描述</param> /// <param name="exception">異常資訊</param> public virtual void WriteErrorLog(string text, Exception exception = null) { // Log.WriteErrorLog($" 連線{RemoteEndPoint}-{text}", exception); } /// <summary> /// 寫入日誌。 /// </summary> /// <param name="text">日誌內容</param> /// <param name="args"></param> public virtual void WriteLog(string text, params object[] args) { WriteLog(string.Format(text, args)); } /// <summary> /// 開始處理接收的包 /// </summary> /// <param name="dataPackage"></param> protected abstract void OnHandleDataPackage(Package dataPackage); /// <summary> /// 傳送資料 /// </summary> /// <param name="bytes"></param> public virtual Result SendBytes(byte[] bytes) { lock (this) { if (!IsClose) { try { Socket.Send(bytes); return true; } catch (Exception ex) { WriteErrorLog($"傳送資料{bytes.ToBase64String()}", ex); if (!Socket.Connected) { Close(); } } } } return false; } }
其mqtt的Socket實現子類如下:
/// <summary> /// mqtt 伺服器連線過來的客戶端。 /// </summary> public class MQTTSocketClient : DotNet.Net.SocketClient<MQTTDataPackage> { /// <summary> /// 表示mqtt伺服器。 /// </summary> public virtual MQTTServer TcpServer { get; set; } /// <summary> /// 獲取一個值,該值指示客戶端是否傳送過了連線協議包。 /// </summary> public virtual bool IsConnect { get; protected set; } private readonly List<TopicDataPackage> subscribeTopics = new List<TopicDataPackage>(); /// <summary> /// 訂閱主題。 /// </summary> public TopicDataPackage[] SubscribeTopics { get => subscribeTopics.ToArray(); } /// <summary> /// 當前訊息序號 /// </summary> public virtual ushort Identifier { get; set; } /// <summary> /// 客戶端連線編號 /// </summary> public virtual string ClientId { get; set; } /// <summary> /// 客戶端唯一連線id /// </summary> public override long Id { get { if (long.TryParse(ClientId, out long id)) { base.Id = id; } else { base.Id = ClientId.GetHashCode(); } return base.Id; } set { ClientId = value.ToString(); } } /// <summary> /// 寫日誌。 /// </summary> /// <param name="text"></param> public override void WriteLog(string text) { if (ClientId != null) { text = $"客戶端編號:{ClientId}:{text}"; } // base.WriteLog(text); } /// <summary> /// 使用<see cref="Socket"/>客戶端初始化。 /// </summary> /// <param name="socket"></param> public MQTTSocketClient(Socket socket) : base(socket) { } /// <summary> /// 關閉服務端連線 /// </summary> protected override void OnClose() { Console.WriteLine($"{ClientId}關閉連線"); } /// <summary> /// 處理收到的包 /// </summary> /// <param name="dataPackage"></param> protected override void OnHandleDataPackage(MQTTDataPackage dataPackage) { try { WriteLog($"收到{dataPackage.MessageType} 包, QoS level:{dataPackage.QoS}"); if (IsConnect && dataPackage.MessageType != MessageType.Connect) { WriteLog($"收到{dataPackage.MessageType} 包, QoS level:{dataPackage.QoS} ,但連線尚未登入,被拋棄"); this.Close(); } switch (dataPackage.MessageType) { case MessageType.Connect: OnConnect(dataPackage); break; case MessageType.Subscribe: OnSubscribe(dataPackage); break; case MessageType.PingRequest: OnPingRequest(dataPackage); break; case MessageType.Publish: OnPublishPackage(dataPackage); break; case MessageType.UnSubscribe: OnUnSubscribe(dataPackage); break; case MessageType.Disconnect: this.Close(); break; } } catch (Exception ex) { } dataPackage = null; } #if NET40 /// <summary> /// 當收到釋出訊息 /// </summary> /// <param name="dataPackage"></param> protected virtual Task OnPublishPackage(MQTTDataPackage dataPackage) { return Task.Factory.StartNew(() => { #else /// <summary> /// 當收到釋出訊息 /// </summary> /// <param name="dataPackage"></param> /// <returns></returns> protected virtual async Task OnPublishPackage(MQTTDataPackage dataPackage) { await Task.Run(() => { #endif try { PublishDataPackage publishDataPackage = new PublishDataPackage(dataPackage); var result = OnPublish(publishDataPackage); if (dataPackage.QoS > 0) { var package = new MQTTDataPackage() { MessageType = MessageType.PublishAck, Data = new byte[3] { (byte)(publishDataPackage.Identifier >> 8), (byte)(publishDataPackage.Identifier & 255), 0 } }; if (dataPackage.QoS == 1) { if (!result.Success) { package.Data[2] = 1; } //SendPackage(package); } } } catch (Exception ex) { } }); } /// <summary> /// 當客戶釋出訊息。 /// </summary> /// <param name="message"></param> /// <returns></returns> protected virtual Result OnPublish(PublishDataPackage message) { WriteLog($"客戶端{message.ClientId}釋出訊息{message.Topic},QoS{message.QoS}。內容:{message.Text}"); try { foreach (var client in TcpServer.Clients) { foreach (var topic in client.SubscribeTopics) { if (MqttTopicFilterComparer.IsMatch(message.Topic, topic.Topic)) { var temp = message.Clone(); temp.QoS = 0;// Math.Min(message.QoS, topic.QoS);//mqtt協議規定,取訂閱主題和傳送主題中最小的qos值。 client.Publish(temp); } } } } catch (Exception ex) { } return true; } /// <summary> /// 釋出訊息。 /// </summary> /// <param name="message">要釋出的訊息。</param> /// <returns></returns> public virtual Result Publish(PublishDataPackage message) { message.Identifier = ++Identifier; this.SendPackage(message);//目前不校驗,qos 直接傳送 return true; } /// <summary> /// 當客戶端傳送了ping 請求 /// </summary> /// <param name="dataPackage"></param> protected virtual void OnPingRequest(MQTTDataPackage dataPackage) { var package = new MQTTDataPackage() { MessageType = MessageType.PingResponse }; SendPackage(package); } /// <summary> /// 發生訂閱訊息 /// </summary> /// <param name="dataPackage"></param> private void OnSubscribe(MQTTDataPackage dataPackage) { TopicDataPackage topicDataPackage = new TopicDataPackage(dataPackage); var result = OnSubscribe(topicDataPackage); var package = new SubscribeAckDataPackage() { Identifier = topicDataPackage.Identifier, Success = result.Success }; if (result.Success) { if (!subscribeTopics.Contains(topicDataPackage)) { subscribeTopics.Add(topicDataPackage); } package.ValidQos = Qos.QoS2;// } SendPackage(package); } /// <summary> /// 取消訂閱訊息 /// </summary> /// <param name="dataPackage"></param> private void OnUnSubscribe(MQTTDataPackage dataPackage) { TopicDataPackage topicDataPackage = new TopicDataPackage(dataPackage); var result = OnUnSubscribe(topicDataPackage); if (result.Success) { if (subscribeTopics.Contains(topicDataPackage)) { subscribeTopics.Remove(topicDataPackage); } var package = new IdentifierAckDataPackage(MessageType.UnSubscribeAck) { Identifier = topicDataPackage.Identifier }; SendPackage(package); } } /// <summary> /// 當收到 取消訂閱主題訊息時。 /// </summary> /// <param name="message"></param> /// <returns></returns> protected virtual Result OnUnSubscribe(TopicDataPackage message) { WriteLog($"客戶端{message.ClientId} 取消訂閱{message.Topic},QoS{message.QoS}"); return true; } /// <summary> /// 當收到訂閱主題訊息時。 /// </summary> /// <param name="message"></param> /// <returns></returns> protected virtual Result OnSubscribe(TopicDataPackage message) { WriteLog($"客戶端{message.ClientId}訂閱{message.Topic},QoS{message.RequestedQoS}"); return true; } /// <summary> /// 當客戶端傳送連線請求時。 /// </summary> /// <param name="dataPackage">連線請求的包</param> private void OnConnect(MQTTDataPackage dataPackage) { ConnectDataPackage connectDataPackage = new ConnectDataPackage(dataPackage); var result = OnClientConnect(connectDataPackage); var client = TcpServer.GetClientById(connectDataPackage.ClientId); if (client.Success) { client.Data.WriteLog($"新的客戶端連線{this.RemoteEndPoint}上線,舊連線關閉"); client.Data.Close(); } ClientId = connectDataPackage.ClientId; this.KeepAlive = Convert.ToInt32(connectDataPackage.KeepAlive * 1000 * 1.5); var package = new ConnectAckDataPackage() { Result = result }; SendPackage(package); } /// <summary> /// 傳送一個標準的mqtt包到客戶端連線。 /// </summary> /// <param name="package"></param> public virtual void SendPackage(MQTTDataPackage package) { WriteLog($"傳送{package.MessageType}包,QOS:{package.QoS}"); this.SendBytes(package.ToBytes()); } /// <summary> /// 當客戶端連線到服務驗證是否可以連線 /// </summary> /// <param name="message"></param> /// <returns></returns> protected virtual Result OnClientConnect(ConnectDataPackage message) { WriteLog($"客戶端{message.ProtocolName}連線,客戶端編號{message.ClientId},使用者名稱:{message.UserName},密碼:{message.Password},CeanSession:{message.CeanSession}"); return true; } /// <summary> /// 接收一個完整的包。 /// </summary> /// <returns></returns> protected override Result<MQTTDataPackage> ReceivePackage() { Result<byte[]> result; Result<MQTTDataPackage> resultPackage = new Result<MQTTDataPackage>() { Success = false }; MQTTDataPackage package = new MQTTDataPackage() { ClientId = ClientId, RemoteEndPoint = RemoteEndPoint }; result = this.Socket.ReceiveBytes(1); if (!result.Success) { WriteLog("獲取mqtt 頭 首位元組失敗"); this.Close(); return resultPackage; } package.Header = result.Data[0]; var msgLengthResult = ReadLength(); if (!msgLengthResult.Success) { WriteLog(msgLengthResult.Message); return resultPackage; } result = this.Socket.ReceiveBytes(msgLengthResult.Data); if (!result.Success) { WriteLog($"獲取資料長度{msgLengthResult.Data}內容失敗"); return resultPackage; } package.Data = result.Data; resultPackage.Data = package; resultPackage.Success = true; resultPackage.Message = "獲取包成功"; return resultPackage; } /// <summary> /// 獲取一個長度資料 /// </summary> /// <returns></returns> protected virtual Result<int> ReadLength() { var result = this.Socket.ReceiveBytes(1); if (!result.Success) { WriteLog("獲取mqtt 長度失敗"); return new Result<int>(result); } var msgType = result.Data[0]; var msgLength = msgType & 127;//取低7為的值,因為可變長度有效值只有低7位,第8位用來標識下一個位元組是否屬於長度位元組 var leftBit = 7; while (msgType >> 7 == 1)//判斷最高位是否為1,如果為1則說明後面的1個位元組也是屬於長度位元組 { result = this.Socket.ReceiveBytes(1); if (!result.Success) { WriteLog("獲取mqtt 長度失敗"); return new Result<int>(result); } msgType = result.Data[0]; msgLength = ((msgType & 127) << leftBit) | msgLength;// 因為mqtt 可變長度的位元組是低位在前,所以新取到的長度要左移取到的次數*7位在|原來的長度。 leftBit += 7; } return msgLength; } }