MQTTnet 實現MQTT 客戶端和服務端
阿新 • • 發佈:2019-01-02
服務端:
class Program { private static MqttServer mqttServer = null; static void Main(string[] args) { MqttNetTrace.TraceMessagePublished += MqttNetTrace_TraceMessagePublished; new Thread(StartMqttServer).Start(); while (true) { var inputString = Console.ReadLine().ToLower().Trim(); if (inputString == "exit") { mqttServer?.StopAsync(); Console.WriteLine("MQTT服務已停止!"); break; } else if (inputString == "clients") { foreach (var item in mqttServer.GetConnectedClients()) { Console.WriteLine($"客戶端標識:{item.ClientId},協議版本:{item.ProtocolVersion}"); } } else { Console.WriteLine($"命令[{inputString}]無效!"); } } } private static void StartMqttServer() { if (mqttServer == null) { try { var options = new MqttServerOptions { ConnectionValidator = p => { if (p.ClientId == "c001") { if (p.Username != "u001" || p.Password != "p001") { return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; } } return MqttConnectReturnCode.ConnectionAccepted; } }; mqttServer = new MqttServerFactory().CreateMqttServer(options) as MqttServer; mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived; mqttServer.ClientConnected += MqttServer_ClientConnected; mqttServer.ClientDisconnected += MqttServer_ClientDisconnected; } catch (Exception ex) { Console.WriteLine(ex.Message); return; } } mqttServer.StartAsync(); Console.WriteLine("MQTT服務啟動成功!"); } private static void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e) { Console.WriteLine($"客戶端[{e.Client.ClientId}]已連線,協議版本:{e.Client.ProtocolVersion}"); } private static void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e) { Console.WriteLine($"客戶端[{e.Client.ClientId}]已斷開連線!"); } private static void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) { Console.WriteLine($"客戶端[{e.ClientId}]>> 主題:{e.ApplicationMessage.Topic} 負荷:{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} Qos:{e.ApplicationMessage.QualityOfServiceLevel} 保留:{e.ApplicationMessage.Retain}"); } private static void MqttNetTrace_TraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e) { /*Console.WriteLine($">> 執行緒ID:{e.ThreadId} 來源:{e.Source} 跟蹤級別:{e.Level} 訊息: {e.Message}"); if (e.Exception != null) { Console.WriteLine(e.Exception); }*/ } }
客戶端
class Program { private static MqttClient mqttClient = null; static string topic = "測試1/測試2/測試3"; static void Main(string[] args) { mqttClient = new MqttClientFactory().CreateMqttClient() as MqttClient; mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived; mqttClient.Connected += MqttClient_Connected; mqttClient.Disconnected += MqttClient_Disconnected; var options = new MqttClientTcpOptions { Server = "127.0.0.1", ClientId = Guid.NewGuid().ToString().Substring(0, 5), UserName = "u001", Password = "p001", CleanSession = true }; mqttClient.ConnectAsync(options); Console.ReadLine(); Console.WriteLine("Hello World!"); } private static void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) { Console.WriteLine($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}"); } private static void MqttClient_Connected(object sender, EventArgs e) { Console.WriteLine("已連線到MQTT伺服器!" + Environment.NewLine); mqttClient.SubscribeAsync(new List<TopicFilter> { new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce) }); Console.WriteLine($"已訂閱[{topic}]主題" + Environment.NewLine); //開始釋出訊息 string inputString = "你好麼"; var appMsg = new MqttApplicationMessage(topic, Encoding.UTF8.GetBytes(inputString), MqttQualityOfServiceLevel.AtMostOnce, false); mqttClient.PublishAsync(appMsg); } private static void MqttClient_Disconnected(object sender, EventArgs e) { Console.WriteLine("已斷開MQTT連線!" + Environment.NewLine); } }
原始碼下載地址:MQTT
MQTT Windows 端驗證程式: http://mqttfx.jensd.de/index.php/download