1. 程式人生 > 實用技巧 >SpringBoot2.x整合MQTT實現訊息訂閱

SpringBoot2.x整合MQTT實現訊息訂閱

1.引入相關的依賴

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>

2. 在配置檔案下配置MQTT伺服器資訊

spring.mqtt.username = admin
spring.mqtt.password = public
spring.mqtt.url = tcp://36.156.157.56:18083
spring.mqtt.client.id = one spring.mqtt.default.topic = topic spring.mqtt.default.completionTimeout = 3000

3.配置MQTT訊息推送配置

/**
 * MQTT配置
 * @Author: songyaru
 * @Date: 2020/8/28 14:04
 * @Version 1.0
 */

@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttReceiveConfig {

    @Value("${spring.mqtt.username}")
    
private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; @Value("${spring.mqtt.default.completionTimeout}") private int completionTimeout;//連線超時 //初始化連線 @Bean public MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); mqttConnectOptions.setKeepAliveInterval(50); return mqttConnectOptions; } //初始化mqtt工廠 @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } //接收通道 @Primary @Bean("mqttInputChannel") public MessageChannel mqttInputChannel() { return new DirectChannel(); } //配置client,監聽的topic @Bean public MessageProducer inbound(@Qualifier("mqttInputChannel") MessageChannel messageChannel) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttClientFactory(),defaultTopic); adapter.setCompletionTimeout(completionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(messageChannel); return adapter; } //訂閱消費資料,通過通道獲取資料 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { log.info("主題:{},訊息接收到的資料:{}", message.getHeaders().get("mqtt_receivedTopic"), message.getPayload()); } }; } }

4.啟動服務,使用上一篇博文的訊息介面傳送訊息。