Spring整合Mqtt簡單使用
前言
MQTT(Message Queuing Telemetry Transport,訊息佇列遙測傳輸協議),是一種基於釋出/訂閱模式的輕量級通訊協議,構建於TCP/IP協議之上,
優點是低開銷,低寬頻佔用,適用於物聯網、小型裝置等弱網環境。
Linux下安裝Mqtt伺服器
使用Docker安裝
docker pull emqx/emqx
這是一個開源的MQTT協議實現,支援MQTT5.0版本。
docker run -d --name mqtt -p 1883:1883 emqx/emqx
建立容器例項,MQTT預設埠號1883
配置賬號密碼
MQTT協議支援多種認證方式,如固定賬號密碼,查詢MySQL,查詢Redis等。具體可以檢視
這裡我們使用EMQX內建Mnesia資料庫儲存賬號密碼。進入容器互動
docker exec -it ba087715dd9b /bin/bash
修改/etc/plugins/emqx_auth_mnesia.conf配置檔案,配置賬號密碼
auth.user.1.username = test1
auth.user.1.password = 123456
啟用emqx_auth_mnesia外掛
emqx_ctl plugins load emqx_auth_mnesia
關閉匿名訪問,修改/etc/emqx.conf配置檔案
allow_anonymous = false
重啟容器
docker restart ba087715dd9b
桌面客戶端連線
MQTTX-下載地址,效果圖如下
Java客戶端
新增maven依賴
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
釋出主題
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class TestMqttPublish { public static void main(String[] args) throws MqttException { MqttClient mqttClient = createMqttClient(); MqttMessage message = new MqttMessage("this is a message".getBytes()); // 服務質量 message.setQos(2); // 釋出訊息 mqttClient.publish("first_topic", message); // 斷開連線 mqttClient.disconnect(); mqttClient.close(); } private static MqttClient createMqttClient() throws MqttException { // 伺服器地址 String broker = "tcp://xxx:1883"; String clientId = "emqx_test";//每個客戶端必須唯一,可以用隨機值 MemoryPersistence persistence = new MemoryPersistence(); MqttClient client = new MqttClient(broker, clientId, persistence); // 配置賬號密碼 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName("test1"); connOpts.setPassword("123456".toCharArray()); connOpts.setCleanSession(true); // 建立連線 client.connect(connOpts); return client; } }
關於服務質量QOS,有3種取值
- 0:至多一次,訊息可能會丟失或重複
- 1:至少一次,訊息確保到達,但可能重複
- 2:只有一次,確保訊息到達一次
訂閱主題
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class TestMqttSubscribe {
public static void main(String[] args) throws MqttException, InterruptedException {
MqttClient mqttClient = createMqttClient();
// 設定訊息回撥處理
mqttClient.setCallback(new MyHandler());
// 訂閱訊息
mqttClient.subscribe("first_topic");
TimeUnit.SECONDS.sleep(10);
// 斷開連線
mqttClient.disconnect();
mqttClient.close();
}
private static MqttClient createMqttClient() throws MqttException {
// 伺服器地址
String broker = "tcp://xxx:1883";
String clientId = "emqx_test2";//每個客戶端必須唯一,可以用隨機值
MemoryPersistence persistence = new MemoryPersistence();
MqttClient client = new MqttClient(broker, clientId, persistence);
// 配置賬號密碼
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("test1");
connOpts.setPassword("123456".toCharArray());
connOpts.setCleanSession(true);
// 建立連線
client.connect(connOpts);
return client;
}
static class MyHandler implements MqttCallback {
/**
* 連線異常斷開
*/
@Override
public void connectionLost(Throwable cause) {
cause.printStackTrace();
}
/**
* 訊息到達
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String msg = new String(message.getPayload());
System.out.println(String.format("客戶端接收到訊息,主題為:%s,內容為:%s", topic, msg));
}
/**
* 訊息傳輸完成
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("訊息傳輸完成");
}
}
}
注意,訊息訂閱的客戶端和訊息釋出的客戶端的clientId必須不一樣。
MQTT5新特性
MQTT5增加了共享訂閱
的功能,相當於訂閱端的負載均衡功能,在5.0之前,如果有多個客戶端訂閱了同一個主題,那麼這多個客戶端都會接收到此訊息。這種情況下,只能由訂閱者自行處理去重(防止多次消費)。
共享訂閱要求我們的主題格式必須為$share/{group}/{filter}
- $share: 固定字首,表明這是一個共享訂閱
- {group} : 群組名,是一個不包含 "/", "+" 以及 "#" 的字串。訂閱會話通過使用相同的{group}表示共享同一個訂閱,匹配該訂閱的訊息每次只會釋出給其中一個客戶端。
例如,假設訂閱者s1,s2,s3屬於群組g1,訂閱者s4,s5屬於群組g2。那麼當 EMQX 向這個主題釋出訊息msg1的時候,s1,s2,s3中只有一個會收到 msg1,s4,s5中只有一個會收到 msg1
[s1]
msg1 /
[emqx] ------> "$share/g1/topic" - [s2] got msg1
| \
| [s3]
| msg1
----> "$share/g2/topic" -- [s4]
\
[s5] got msg1
- {filter}: 即非共享訂閱中的主題過濾器
訂閱主題程式碼為
mqttClient.subscribe("$share/mqtt/first_topic"); //如果非共享主題為/server/first_topic,那麼共享主題為$share/mqtt//server/first_topic
釋出主題程式碼為
mqttClient.publish("first_topic", message);
如果想要使用更多MQTT5新特性,需要使用下面的maven依賴
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
更多新特性介紹,可以檢視MQTT 5.0。
Spring整合Mqtt
新增maven依賴
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.1.6.RELEASE</version>
</dependency>
程式碼實現
import java.util.UUID;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
@IntegrationComponentScan
public class MqttClientConfig {
/**
* 連線工廠,配置賬號密碼等資訊
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName("test1");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setServerURIs(new String[]{"tcp://xxx:1883"});
mqttConnectOptions.setKeepAliveInterval(2);
mqttConnectOptions.setAutomaticReconnect(true);
factory.setConnectionOptions(mqttConnectOptions);
return factory;
}
private String createClientId() {
return UUID.randomUUID().toString();
}
/**
* 配置client,釋出.
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
createClientId(), mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultQos(2);
messageHandler.setDefaultRetained(false); //不保留訊息
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
//接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* 配置client,監聽的topic.
*/
@Bean
public MessageProducer inbound() {
String[] topics = {"$share/mqtt/first_topic"};
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(createClientId(),
mqttClientFactory(), topics);
adapter.setTaskScheduler(new ThreadPoolTaskScheduler());
adapter.setCompletionTimeout(3_000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/**
* 訊息處理器
*/
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return (message -> {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String payload = message.getPayload().toString();
System.out.println("訊息主題:" + topic);
System.out.println("訊息內容:" + payload);
});
}
}
底層也是使用的org.eclipse.paho.client.mqttv3依賴。接下來配置訊息閘道器
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}
後續直接依賴此閘道器物件就可以了,Spring底層使用GatewayProxyFactoryBean來例項化此Bean。SpringBoot專案中配置上述兩個類就可以使用了。