Java MQTT 客戶端之 Paho
阿新 • • 發佈:2020-07-17
Paho 自動重連後訂閱的主題會清空,所以需要實現 MqttCallbackExtended 介面,在 connectComplete 方法新增訂閱主題;而不是實現 MqttCallback 介面
一、新增引用
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency>
二、新增配置
mqtt: client: username: admin password: public serverURI: tcp://192.168.137.101:1883 clientId: paho_${random.int[1000,9999]} keepAliveInterval: 120 connectionTimeout: 30 producer: defaultQos: 1 defaultRetained: true defaultTopic: topic/test1 consumer: consumerTopics: topic/test2,topic/test3
三、程式碼
3.1.客戶端
@Configuration public class MqttConfig { @Value("${mqtt.client.username}") private String username; @Value("${mqtt.client.password}") private String password; @Value("${mqtt.client.serverURI}") private String serverURI; @Value("${mqtt.client.clientId}") private String clientId; @Value("${mqtt.client.keepAliveInterval}") private int keepAliveInterval; @Value("${mqtt.client.connectionTimeout}") private int connectionTimeout; @Autowired private MyMqttCallback myMqttCallback; @Bean public MqttClient mqttClient() { try { MqttClientPersistence persistence = mqttClientPersistence(); MqttClient client = new MqttClient(serverURI, clientId, persistence); myMqttCallback.setMqttClient(client); client.setCallback(myMqttCallback); client.connect(mqttConnectOptions()); // client.subscribe(subTopic); return client; } catch (MqttException e) { System.out.println(e.getMessage()); return null; } } @Bean public MqttConnectOptions mqttConnectOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); options.setCleanSession(true); options.setAutomaticReconnect(true); options.setConnectionTimeout(connectionTimeout); options.setKeepAliveInterval(keepAliveInterval); return options; } public MqttClientPersistence mqttClientPersistence() { return new MemoryPersistence(); } }
3.2.訂閱者
@Component
public class MyMqttCallback implements MqttCallbackExtended {
@Value("${mqtt.consumer.consumerTopics}")
private String[] consumerTopics;
@Autowired
private MqttService mqttService;
private MqttClient mqttClient;
@Override
public void connectionLost(Throwable throwable) {
System.out.println("連線斷開");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
mqttService.message(topic, message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
@Override
public void connectComplete(boolean b, String s) {
try {
mqttClient.subscribe(consumerTopics);
} catch (MqttException e) {
System.out.println(e.getMessage());
}
}
public void setMqttClient(MqttClient mqttClient) {
this.mqttClient = mqttClient;
}
}
3.3.釋出者
@Component
public class MqttProducer {
@Value("${mqtt.producer.defaultQos}")
private int defaultProducerQos;
@Value("${mqtt.producer.defaultRetained}")
private boolean defaultRetained;
@Value("${mqtt.producer.defaultTopic}")
private String defaultTopic;
@Autowired
private MqttClient mqttClient;
public void send(String payload) {
this.send(defaultTopic, payload);
}
public void send(String topic, String payload) {
this.send(topic, defaultProducerQos, payload);
}
public void send(String topic, int qos, String payload) {
this.send(topic, qos, defaultRetained, payload);
}
public void send(String topic, int qos, boolean retained, String payload) {
try {
mqttClient.publish(topic, payload.getBytes(), qos, retained);
} catch (MqttException e) {
System.out.println(e.getMessage());
}
}
}
@RestController
public class MqttController {
@Autowired
private MqttProducer mqttProducer;
@RequestMapping("/send")
public void send() {
mqttProducer.send("test content");
}
}
完整程式碼:GitHub
參考