1. 程式人生 > >MQTT釋出訂閱程式完整程式碼

MQTT釋出訂閱程式完整程式碼

  1. 工具類,包含有釋出者方法和訂閱者方法。
    package cn.com.bonc.wholeCode;
    
    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    import java.util.UUID;
    
    /**
     * mqtt的釋出和訂閱
     *
     * @author wzq
     */
    public class PublishSubscribe {
    
    
        private static String serviceURI = "tcp://172.16.22.160:1883";
        private static String clientID = UUID.randomUUID().toString();
        private static MqttClientPersistence persistence = new MemoryPersistence();
        //如果mqtt服務配置了匿名訪問,則不需要使用使用者名稱和密碼就可以實現訊息的訂閱和釋出
    //    private static String username = "username";
    //    private static String password = "password";
        private static String topic = "cebPark";
        /*
            訊息服務質量,一共有三個:
            0:盡力而為。訊息可能會丟,但絕不會重複傳輸
            1:訊息絕不會丟,但可能會重複傳輸
            2:恰好一次。每條訊息肯定會被傳輸一次且僅傳輸一次
         */
        private static int qos = 0;
    
        /**
         * 訊息釋出
         *
         * @author wzq
         **/
        public static void publish() {
            try {
                MqttClient client = new MqttClient(serviceURI, clientID, persistence);
                MqttConnectOptions connectOptions = new MqttConnectOptions();
    //            connectOptions.setUserName(username);
    //            connectOptions.setPassword(password.toCharArray());
                connectOptions.setCleanSession(false);
                //釋出者連線服務
                client.connect(connectOptions);
                System.out.println("釋出者連線狀態: " + client.isConnected());
                MqttTopic mqttTopic = client.getTopic(topic);
                //MqttMessage mqttMessage = new MqttMessage(message.getBytes());
                MqttMessage mqttMessage = new MqttMessage();
                mqttMessage.setQos(qos);
                int i = 1;
                String message = "hello,智慧公廁-";
                while (true) {
                    String payLoad = message + i++;
                    mqttMessage.setPayload(payLoad.getBytes());
                    MqttDeliveryToken deliveryToken = mqttTopic.publish(mqttMessage);
                    if (!deliveryToken.isComplete()) {
                        System.out.println("釋出者釋出訊息: " + payLoad + " 失敗");
                        deliveryToken.waitForCompletion();
                    } else {
                        System.out.println("釋出者釋出訊息: " + payLoad + " 成功");
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 訊息訂閱
         *
         * @author wzq
         **/
        public static void subscribe() {
            try {
                MqttClient client = new MqttClient(serviceURI, clientID, persistence);
                client.setCallback(new MqttCallback() {
                    @Override
                    public void connectionLost(Throwable cause) {
                        System.out.println("訂閱者連線丟失...");
                        System.out.println(cause.getMessage());
                    }
    
                    @Override
                    public void messageArrived(String topic, MqttMessage message) {
                        System.out.println("訂閱者接收到訊息: " + message.toString());
                    }
    
                    @Override
                    public void deliveryComplete(IMqttDeliveryToken token) {
                    }
                });
                MqttConnectOptions connectOptions = new MqttConnectOptions();
    //            connectOptions.setUserName(username);
    //            connectOptions.setPassword(password.toCharArray());
                connectOptions.setCleanSession(false);
                //訂閱者連線訂閱主題
                client.connect(connectOptions);
                client.subscribe(topic, qos);
                System.out.println("訂閱者連線狀態: " + client.isConnected());
            } catch (MqttException e) {
                e.printStackTrace();
            }
    
        }
    
    
    }
    
  2. 釋出者
    package cn.com.bonc.wholeCode;
    
    /**
     * mqtt釋出
     * @author: wzq
     * @time: 2018-07-27 16:43
     */
    public class Publish {
        public static void main(String[] args) {
            PublishSubscribe.publish();
        }
    }
    
  3. 訂閱者
    package cn.com.bonc.wholeCode;
    
    /**
     * mqtt訂閱
     * @author: wzq
     * @time: 2018-07-27 16:43
     */
    public class Subscribe {
        public static void main(String[] args) {
            PublishSubscribe.subscribe();
        }
    }