1. 程式人生 > >mqtt協議 springboot2.0.4 mqttv3 釋出訂閱程式碼呼叫,mqtt斷線重連

mqtt協議 springboot2.0.4 mqttv3 釋出訂閱程式碼呼叫,mqtt斷線重連

上篇博文講了安裝和配置:https://blog.csdn.net/jianeng_Love_IT/article/details/83061717

mqttv3 釋出訂閱程式碼呼叫

我用的是springboot2.0.4

直接上程式碼:

pom.xml

  <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>

簡單的呼叫程式碼:

回撥處理類:

public class CallBack implements MqttCallback {

    private MqttClient client;
    private MqttConnectOptions options;

    public CallBack() {
    }

    public CallBack(MqttClient client, MqttConnectOptions options) {
        this.client = client;
        this.options = options;
    }
    //方法實現說明 斷線重連方法,如果是持久訂閱,重連是不需要再次訂閱,如果是非持久訂閱,重連是需要重新訂閱主題 取決於options.setCleanSession(true); true為非持久訂閱
    @Override
    public void connectionLost(Throwable cause) {
       //失敗重連邏輯
        while (true){
            try {
                System.out.println("連線失敗重連");
                client.connect(options);
                //釋出相關的訂閱
                String[] topic = {"msg.topic","dance.topic"};
                int[] qos = {1,1};
                client.subscribe(topic, qos);
                System.out.println("連線失敗重連成功");
                break;
            } catch (MqttException e) {
                e.printStackTrace();
                System.out.println("連線失敗重連失敗");
            }
        }
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        System.out.println(new Date());
        System.out.println("public void messageArrived(String s, MqttMessage mqttMessage)");
        System.out.println(s);
        System.out.println(new String(mqttMessage.getPayload(),"UTF-8"));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken)");
    }
}

釋出類:

public class MQTTPublisher {
    public static void publish(){
        try {
            MqttClient client = new MqttClient("tcp://127.0.0.1:61613","mqttserver-pub");
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName("admin");
            options.setPassword("admin".toCharArray());
            options.setCleanSession(false);
            options.setAutomaticReconnect(true);
            client.connect(options);
            MqttTopic topic = client.getTopic("msg.topic");
            MqttMessage message = new MqttMessage("Hello World".getBytes());
            message.setQos(1);
            while(true){
                Thread.sleep(1000);
                MqttDeliveryToken token = topic.publish(message);
                while (!token.isComplete()){
                    token.waitForCompletion(1000);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

訂閱類:

public class MQTTSubsribe {
    public static String subsribe() {
        try {
            //建立MqttClient
            MqttClient client = new MqttClient("tcp://127.0.0.1:61613", "java_client23432ere");
            //建立連線可選項資訊
            MqttConnectOptions conOptions = new MqttConnectOptions();
            //
            conOptions.setUserName("admin");
            conOptions.setPassword("admin".toCharArray());
            conOptions.setCleanSession(false);
            conOptions.setAutomaticReconnect(true);
            //回撥處理類
            CallBack callback = new CallBack(client,conOptions);
            client.setCallback(callback);
            //連線broker
            client.connect(conOptions);
            //釋出相關的訂閱
            String[] topic = {"msg.topic","dance.topic"};
            int[] qos = {1,1};
            client.subscribe(topic, qos);
            //client.disconnect();
        } catch (Exception e) {
            e.printStackTrace();
            return "failed";
        }
        return "success";
    }
}

測試類:

動動你的大手,試試!