mqtt協議 springboot2.0.4 mqttv3 釋出訂閱程式碼呼叫,mqtt斷線重連
阿新 • • 發佈:2019-01-09
上篇博文講了安裝和配置:https://blog.csdn.net/jianeng_Love_IT/article/details/83061717
mqttv3 釋出訂閱程式碼呼叫
我用的是springboot2.0.4
直接上程式碼:
pom.xml
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency>
簡單的呼叫程式碼:
回撥處理類:
public class CallBack implements MqttCallback { private MqttClient client; private MqttConnectOptions options; public CallBack() { } public CallBack(MqttClient client, MqttConnectOptions options) { this.client = client; this.options = options; } //方法實現說明 斷線重連方法,如果是持久訂閱,重連是不需要再次訂閱,如果是非持久訂閱,重連是需要重新訂閱主題 取決於options.setCleanSession(true); true為非持久訂閱 @Override public void connectionLost(Throwable cause) { //失敗重連邏輯 while (true){ try { System.out.println("連線失敗重連"); client.connect(options); //釋出相關的訂閱 String[] topic = {"msg.topic","dance.topic"}; int[] qos = {1,1}; client.subscribe(topic, qos); System.out.println("連線失敗重連成功"); break; } catch (MqttException e) { e.printStackTrace(); System.out.println("連線失敗重連失敗"); } } } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { System.out.println(new Date()); System.out.println("public void messageArrived(String s, MqttMessage mqttMessage)"); System.out.println(s); System.out.println(new String(mqttMessage.getPayload(),"UTF-8")); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken)"); } }
釋出類:
public class MQTTPublisher { public static void publish(){ try { MqttClient client = new MqttClient("tcp://127.0.0.1:61613","mqttserver-pub"); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName("admin"); options.setPassword("admin".toCharArray()); options.setCleanSession(false); options.setAutomaticReconnect(true); client.connect(options); MqttTopic topic = client.getTopic("msg.topic"); MqttMessage message = new MqttMessage("Hello World".getBytes()); message.setQos(1); while(true){ Thread.sleep(1000); MqttDeliveryToken token = topic.publish(message); while (!token.isComplete()){ token.waitForCompletion(1000); } } } catch (Exception e) { e.printStackTrace(); } } }
訂閱類:
public class MQTTSubsribe {
public static String subsribe() {
try {
//建立MqttClient
MqttClient client = new MqttClient("tcp://127.0.0.1:61613", "java_client23432ere");
//建立連線可選項資訊
MqttConnectOptions conOptions = new MqttConnectOptions();
//
conOptions.setUserName("admin");
conOptions.setPassword("admin".toCharArray());
conOptions.setCleanSession(false);
conOptions.setAutomaticReconnect(true);
//回撥處理類
CallBack callback = new CallBack(client,conOptions);
client.setCallback(callback);
//連線broker
client.connect(conOptions);
//釋出相關的訂閱
String[] topic = {"msg.topic","dance.topic"};
int[] qos = {1,1};
client.subscribe(topic, qos);
//client.disconnect();
} catch (Exception e) {
e.printStackTrace();
return "failed";
}
return "success";
}
}
測試類:
動動你的大手,試試!