1. 程式人生 > >使用MQTTnet連線Mqtt伺服器

使用MQTTnet連線Mqtt伺服器

上篇文章介紹了mqttnet的內容,並使用mqttnet搭建了一個mqtt伺服器。本篇文章將繼續使用mqttnet做一個客戶端,用於連線mqtt伺服器。

client的介面部署入下圖所示,

1、單個mqtt client,可是使用訂閱主題和釋出主題,

2、建立多個mqtt client,測試mqtt server的效能

單個mqtt client連線伺服器

添加了客戶端連線,斷開和訊息到達的事件

private async void MqttClient()
{
    try
    {
		var options = new MqttClientOptions() {ClientId = Guid.NewGuid().ToString("D")};
		options.ChannelOptions = new MqttClientTcpOptions()
		{
			Server = TxbServer.Text,
			Port = Convert.ToInt32(TxbPort.Text)
		};
		options.Credentials = new MqttClientCredentials()
		{
			Username = "admin",
			Password = "public"
		};

		options.CleanSession = true;
		options.KeepAlivePeriod = TimeSpan.FromSeconds(100.5);
		options.KeepAliveSendInterval = TimeSpan.FromSeconds(20000);

		if (null != _mqttClient)
		{
			await _mqttClient.DisconnectAsync();
			_mqttClient = null;
		}
		_mqttClient = new MqttFactory().CreateMqttClient();

		_mqttClient.ApplicationMessageReceived += (sender, args) =>
		{
			listBox1.BeginInvoke(
				_updateListBoxAction,
				$"ClientID:{args.ClientId} | TOPIC:{args.ApplicationMessage.Topic} | Payload:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)} | QoS:{args.ApplicationMessage.QualityOfServiceLevel} | Retain:{args.ApplicationMessage.Retain}"
				);
		};

		_mqttClient.Connected += (sender, args) =>
		{
			listBox1.BeginInvoke(_updateListBoxAction,
			$"Client is Connected:  IsSessionPresent:{args.IsSessionPresent}");
		};

		_mqttClient.Disconnected += (sender, args) =>
		{
			listBox1.BeginInvoke(_updateListBoxAction,
			$"Client is DisConnected ClientWasConnected:{args.ClientWasConnected}");
		};

		await _mqttClient.ConnectAsync(options);
    }
    catch (Exception)
    {
		throw;
    }
}

多個mqtt client連線伺服器:

多個mqtt client的連線使用ManagedClient,

private List<IManagedMqttClient> managedMqttClients = new List<IManagedMqttClient>(); 

private async void MqttMultiClient( int clientsCount)
{
    await Task.Factory.StartNew(async () =>
     {
		for (int i = 0; i < clientsCount; i++)
		{
			var options = new ManagedMqttClientOptionsBuilder()
			 .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
			 .WithClientOptions(new MqttClientOptionsBuilder()
				 .WithClientId(Guid.NewGuid().ToString().Substring(0, 13))
				 .WithTcpServer(TxbServer.Text, Convert.ToInt32(TxbPort.Text))
				 .WithCredentials("admin", "public")
				 .Build()
			 )
			 .Build();

			var c = new MqttFactory().CreateManagedMqttClient();
			await c.SubscribeAsync(
				new TopicFilterBuilder().WithTopic(txbSubscribe.Text)
				.WithQualityOfServiceLevel(
		        (MqttQualityOfServiceLevel)
				Enum.Parse(typeof(MqttQualityOfServiceLevel), CmbSubMqttQuality.Text)).Build());

			 await c.StartAsync(options);

			 managedMqttClients.Add(c);

			 Thread.Sleep(200);
        }
     });
}

關於ManagedClient的介紹,官方給出瞭如下解釋:

This project also contains a managed MQTT client. The client has some additional functionality in comparison with the regular MqttClient. Those functionalities are reflecting the most common use cases and thus the ManagedClient provides a out-of-the-box MQTT client with the following features.

  1. The managed client is started once and will maintain the connection automatically including reconnecting etc.
  2. All MQTT application messages are added to an internal queue and processed once the server is available.
  3. All MQTT application messages can be stored to support sending them after a restart of the application
  4. All subscriptions are managed across server connections. There is no need to subscribe manually after the connection with the server is lost.

 MqttNetGlobalLogger的使用

//MqttNetGlobalLogger的使用
MqttNetGlobalLogger.LogMessagePublished += (o, args) =>
{
    var s = new StringBuilder();
    s.Append($"{args.TraceMessage.Timestamp} ");
    s.Append($"{args.TraceMessage.Level} ");
    s.Append($"{args.TraceMessage.Source} ");
    s.Append($"{args.TraceMessage.ThreadId} ");
    s.Append($"{args.TraceMessage.Message} "); 
    s.Append($"{args.TraceMessage.Exception}");
    s.Append($"{args.TraceMessage.LogId} ");
};

執行效果圖: