1. 程式人生 > 其它 >Spring整合Mqtt簡單使用

Spring整合Mqtt簡單使用

前言

MQTT(Message Queuing Telemetry Transport,訊息佇列遙測傳輸協議),是一種基於釋出/訂閱模式的輕量級通訊協議,構建於TCP/IP協議之上,
優點是低開銷,低寬頻佔用,適用於物聯網、小型裝置等弱網環境。

Linux下安裝Mqtt伺服器

使用Docker安裝

docker pull emqx/emqx

這是一個開源的MQTT協議實現,支援MQTT5.0版本。

docker run -d --name mqtt -p 1883:1883 emqx/emqx

建立容器例項,MQTT預設埠號1883

配置賬號密碼

MQTT協議支援多種認證方式,如固定賬號密碼,查詢MySQL,查詢Redis等。具體可以檢視

EMQX認證
這裡我們使用EMQX內建Mnesia資料庫儲存賬號密碼。進入容器互動

docker exec -it ba087715dd9b /bin/bash

修改/etc/plugins/emqx_auth_mnesia.conf配置檔案,配置賬號密碼

auth.user.1.username = test1
auth.user.1.password = 123456

啟用emqx_auth_mnesia外掛

emqx_ctl plugins load emqx_auth_mnesia

關閉匿名訪問,修改/etc/emqx.conf配置檔案

allow_anonymous = false

重啟容器

docker restart ba087715dd9b

桌面客戶端連線

MQTTX-下載地址,效果圖如下

Java客戶端

新增maven依賴

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.2</version>
</dependency>

釋出主題

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class TestMqttPublish {

  public static void main(String[] args) throws MqttException {
    MqttClient mqttClient = createMqttClient();
    MqttMessage message = new MqttMessage("this is a message".getBytes());
    // 服務質量 
    message.setQos(2);
    // 釋出訊息
    mqttClient.publish("first_topic", message);
    // 斷開連線
    mqttClient.disconnect();
    mqttClient.close();
  }

  private static MqttClient createMqttClient() throws MqttException {
    // 伺服器地址
    String broker = "tcp://xxx:1883";
    String clientId = "emqx_test";//每個客戶端必須唯一,可以用隨機值
    MemoryPersistence persistence = new MemoryPersistence();
    MqttClient client = new MqttClient(broker, clientId, persistence);
    // 配置賬號密碼
    MqttConnectOptions connOpts = new MqttConnectOptions();
    connOpts.setUserName("test1");
    connOpts.setPassword("123456".toCharArray());
    connOpts.setCleanSession(true);
    // 建立連線
    client.connect(connOpts);
    return client;
  }

}

關於服務質量QOS,有3種取值

  • 0:至多一次,訊息可能會丟失或重複
  • 1:至少一次,訊息確保到達,但可能重複
  • 2:只有一次,確保訊息到達一次

訂閱主題

import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class TestMqttSubscribe {

  public static void main(String[] args) throws MqttException, InterruptedException {
    MqttClient mqttClient = createMqttClient();
    // 設定訊息回撥處理
    mqttClient.setCallback(new MyHandler());
    // 訂閱訊息
    mqttClient.subscribe("first_topic");
    TimeUnit.SECONDS.sleep(10);
    // 斷開連線
    mqttClient.disconnect();
    mqttClient.close();
  }

  private static MqttClient createMqttClient() throws MqttException {
    // 伺服器地址
    String broker = "tcp://xxx:1883";
    String clientId = "emqx_test2";//每個客戶端必須唯一,可以用隨機值
    MemoryPersistence persistence = new MemoryPersistence();
    MqttClient client = new MqttClient(broker, clientId, persistence);
    // 配置賬號密碼
    MqttConnectOptions connOpts = new MqttConnectOptions();
    connOpts.setUserName("test1");
    connOpts.setPassword("123456".toCharArray());
    connOpts.setCleanSession(true);
    // 建立連線
    client.connect(connOpts);
    return client;
  }

  static class MyHandler implements MqttCallback {

    /**
     * 連線異常斷開
     */
    @Override
    public void connectionLost(Throwable cause) {
      cause.printStackTrace();
    }

    /**
     * 訊息到達
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
      String msg = new String(message.getPayload());
      System.out.println(String.format("客戶端接收到訊息,主題為:%s,內容為:%s", topic, msg));
    }

    /**
     * 訊息傳輸完成
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
      System.out.println("訊息傳輸完成");
    }
  }

}

注意,訊息訂閱的客戶端和訊息釋出的客戶端的clientId必須不一樣。

MQTT5新特性

MQTT5增加了共享訂閱的功能,相當於訂閱端的負載均衡功能,在5.0之前,如果有多個客戶端訂閱了同一個主題,那麼這多個客戶端都會接收到此訊息。這種情況下,只能由訂閱者自行處理去重(防止多次消費)。
共享訂閱要求我們的主題格式必須為$share/{group}/{filter}

  • $share: 固定字首,表明這是一個共享訂閱
  • {group} : 群組名,是一個不包含 "/", "+" 以及 "#" 的字串。訂閱會話通過使用相同的{group}表示共享同一個訂閱,匹配該訂閱的訊息每次只會釋出給其中一個客戶端。
    例如,假設訂閱者s1,s2,s3屬於群組g1,訂閱者s4,s5屬於群組g2。那麼當 EMQX 向這個主題釋出訊息msg1的時候,s1,s2,s3中只有一個會收到 msg1,s4,s5中只有一個會收到 msg1
                                       [s1]
           msg1                      /
[emqx]  ------>  "$share/g1/topic"    - [s2] got msg1
         |                           \
         |                             [s3]
         | msg1
          ---->  "$share/g2/topic"   --  [s4]
                                     \
                                      [s5] got msg1

  • {filter}: 即非共享訂閱中的主題過濾器

訂閱主題程式碼為

mqttClient.subscribe("$share/mqtt/first_topic"); //如果非共享主題為/server/first_topic,那麼共享主題為$share/mqtt//server/first_topic

釋出主題程式碼為

mqttClient.publish("first_topic", message);

如果想要使用更多MQTT5新特性,需要使用下面的maven依賴

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
  <version>1.2.5</version>
</dependency>

更多新特性介紹,可以檢視MQTT 5.0

Spring整合Mqtt

新增maven依賴

<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
  <version>5.1.6.RELEASE</version>
</dependency>

程式碼實現

import java.util.UUID;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
@IntegrationComponentScan
public class MqttClientConfig {

  /**
   * 連線工廠,配置賬號密碼等資訊
   */
  @Bean
  public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    mqttConnectOptions.setUserName("test1");
    mqttConnectOptions.setPassword("123456".toCharArray());
    mqttConnectOptions.setServerURIs(new String[]{"tcp://xxx:1883"});
    mqttConnectOptions.setKeepAliveInterval(2);
    mqttConnectOptions.setAutomaticReconnect(true);
    factory.setConnectionOptions(mqttConnectOptions);
    return factory;
  }


  private String createClientId() {
    return UUID.randomUUID().toString();
  }

  /**
   * 配置client,釋出.
   */
  @Bean
  @ServiceActivator(inputChannel = "mqttOutboundChannel")
  public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
        createClientId(), mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultQos(2);
    messageHandler.setDefaultRetained(false); //不保留訊息
    return messageHandler;
  }

  @Bean
  public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
  }

  //接收通道
  @Bean
  public MessageChannel mqttInputChannel() {
    return new DirectChannel();
  }

  /**
   * 配置client,監聽的topic.
   */
  @Bean
  public MessageProducer inbound() {
    String[] topics = {"$share/mqtt/first_topic"};
    MqttPahoMessageDrivenChannelAdapter adapter =
        new MqttPahoMessageDrivenChannelAdapter(createClientId(),
            mqttClientFactory(), topics);
    adapter.setTaskScheduler(new ThreadPoolTaskScheduler());
    adapter.setCompletionTimeout(3_000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(2);
    adapter.setOutputChannel(mqttInputChannel());
    return adapter;
  }

  /**
   * 訊息處理器
   */
  @Bean
  @ServiceActivator(inputChannel = "mqttInputChannel")
  public MessageHandler handler() {
    return (message -> {
      String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
      String payload = message.getPayload().toString();
      System.out.println("訊息主題:" + topic);
      System.out.println("訊息內容:" + payload);
    });
  }
}

底層也是使用的org.eclipse.paho.client.mqttv3依賴。接下來配置訊息閘道器

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

  void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}

後續直接依賴此閘道器物件就可以了,Spring底層使用GatewayProxyFactoryBean來例項化此Bean。SpringBoot專案中配置上述兩個類就可以使用了。

參考

MQTT 入門介紹
EMQX文件