Java實現MQTT通訊
阿新 • • 發佈:2021-09-16
關於MQTT
MQTT是一個基於客戶端-伺服器的訊息釋出/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用範圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通訊和物聯網(IoT)。
釋出訂閱
服務端
服務端使用 mosquitto
下載頁面:https://mosquitto.org/download/
客戶端
MQTTX
下載頁面:https://mqttx.app/#download
MQTT.fx
下載連結:http://www.jensd.de/apps/mqttfx/1.7.1/mqttfx-1.7.1-windows-x64.exe
paho
https://github.com/eclipse/paho.mqtt.java
paho是eclipse提供MQTT客戶端開源庫,Java程式碼整合這個客戶端用來收發訊息。
程式碼
依賴
pom.xml
<!-- MQTT -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
類MqttConfig
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import java.util.Date; @Configuration public class MqttConfig { // 消費訊息 /** * 建立MqttPahoClientFactory,設定MQTT Broker連線屬性,如果使用SSL驗證,也在這裡設定。 * @return */ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[]{"tcp://127.0.0.1:1883"}); factory.setConnectionOptions(options); return factory; } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("consumerClient-SS", mqttClientFactory(), "boat", "collector", "battery", "+/sensor"); adapter.setCompletionTimeout(5000); DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); defaultPahoMessageConverter.setPayloadAsBytes(true); adapter.setConverter(defaultPahoMessageConverter); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean // ServiceActivator註解表明當前方法用於處理MQTT訊息,inputChannel引數指定了用於消費訊息的channel。 @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return message -> { String payload = message.getPayload().toString(); // byte[] bytes = (byte[]) message.getPayload(); // 收到的訊息是位元組格式 String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // 根據topic分別進行訊息處理。 if (topic.matches(".+/sensor")) { // 匹配:1/sensor String sensorSn = topic.split("/")[0]; System.out.println("感測器" + sensorSn + ": 的訊息: " + payload); } else if (topic.equals("collector")) { System.out.println("採集器的訊息:" + payload); } else { System.out.println("丟棄訊息:主題【" + topic + "],負載:" + payload); } }; } // 傳送訊息 @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /** * 傳送訊息和消費訊息Channel可以使用相同MqttPahoClientFactory * @return */ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler outbound() { // 在這裡進行mqttOutboundChannel的相關設定 MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publishClient", mqttClientFactory()); messageHandler.setAsync(true); //如果設定成true,即非同步,傳送訊息時將不會阻塞。 messageHandler.setDefaultTopic("command"); messageHandler.setDefaultQos(2); DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); // defaultPahoMessageConverter.setPayloadAsBytes(true); // 傳送預設按位元組型別傳送訊息 messageHandler.setConverter(defaultPahoMessageConverter); return messageHandler; } }
介面MqttGateway
import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { // 定義過載方法,用於訊息傳送 void sendToMqtt(String payload); // 指定topic進行訊息傳送 void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); }
測試
類MqttController
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class MqttController {
@Resource
private MqttGateway mqttGateway;
@RequestMapping("/send/{topic}/{message}")
public String send(@PathVariable String topic, @PathVariable String message) {
// 傳送訊息到指定topic
mqttGateway.sendToMqtt(topic, 1, message);
return "send message : " + message;
}
}