1. 程式人生 > >Android 物聯網常用網路框架Mqtt

Android 物聯網常用網路框架Mqtt

 

import android.content.Intent;
import android.util.Log;

import com.sqy.vending.mqttvending_master.app.MyApplication;
import com.sqy.vending.mqttvending_master.bean.MqttMsgEvent;
import com.sqy.vending.mqttvending_master.db.DatabaseUtil;
import com.sqy.vending.mqttvending_master.util.JsonUtil;
import com.sqy.vending.mqttvending_master.util.LogUtils;
import com.sqy.vending.mqttvending_master.util.SharedPreferencesUtil;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.greenrobot.eventbus.EventBus;
import org.json.JSONException;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.sqy.vending.mqttvending_master.manager.Mqtt.isBoolean;

/**
 * Created by Administrator on 2018/6/7.
 */

public class MqttManager {
    public static String TAG = MqttManager.class.getSimpleName();
    public static MqttManager mInstance = null;
    public MqttClient client;
    public MqttConnectOptions conOpt;
    private ScheduledExecutorService scheduler;
    private boolean clean = true;

    private static volatile boolean hasReConnection = false;

//    private ThreadPoolExecutor executor = new ThreadPoolExecutor();//使用執行緒池

    public static MqttManager getInstance() {
        if (null == mInstance) {
            mInstance = new MqttManager();
        }
        return mInstance;
    }

    /**
     * 建立 Mqtt 連線
     */
    public boolean createConnect(String brokerUrl, String userName, String password, String clientId) {
        boolean flag = false;
        String tmpDir = System.getProperty("java.io.tmpdir");
        MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
        Log.e(TAG, "createConnect: brokerUrl:" + brokerUrl + "  userName:" + userName + "  password:" + password + "  clientId:" + clientId);
        try {
            conOpt = new MqttConnectOptions();
            conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
            conOpt.setCleanSession(clean);
            conOpt.setPassword(password.toCharArray());
            conOpt.setUserName(userName);
            conOpt.setConnectionTimeout(10);    //設定連線超時
            conOpt.setKeepAliveInterval(8);    //設定心跳時長 10*1.5
            client = new MqttClient(brokerUrl, clientId, dataStore);
            client.setCallback(mqttCallback);
            if (client != null) {
                try {
                    client.connect(conOpt);
                    flag = true;
                } catch (Exception e) {
                    flag = false;
                }
            }
        } catch (MqttException e) {
            Log.e(TAG, "createConnect:MqttException: " + e.toString());
        }
        return flag;
    }

    /**
     * 連線服務端
     */
    public void Connect() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                boolean isflag = MqttManager.getInstance().createConnect(Mqtt.Mqtt_Url, Mqtt.Mqtt_UserName, Mqtt.Mqtt_Port, SharedPreferencesUtil.getString("device_id"));
                if (isflag) {
                    subScrbe();
                    Mqtt.isBoolean = true;
                    LogUtils.e(TAG, "連線成功...");
                    Log.e(TAG, "連線成功...");
                } else {
                    LogUtils.e(TAG, "連線失敗...");
                    Log.e(TAG, "連線失敗...");
                    EventBus.getDefault().post(new MqttMsgEvent(JsonUtil.tipsInfo("連線失敗", Mqtt.CONNECTION_FAIL)));
                }
                startReconnect();
            }
        }).start();
    }

    /**
     * 定時器,每隔10秒檢查一次mqtt是否連線
     */
    private void startReconnect() {
        Log.e("RE", "startReconnect: 重連::");
        if (scheduler == null) {
            scheduler = Executors.newSingleThreadScheduledExecutor();
            scheduler.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    if (!MqttManager.getInstance().client.isConnected()) {
                        LogUtils.e(TAG, "定時檢查到mqtt連線斷開...");
                        Log.e("RE", "定時檢查到mqtt連線斷開...");
                        isBoolean = false;
                        hasReConnection = true;
                        Connect();
                    }
                }
            }, 10 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
        }
    }

    /**
     * 訂閱
     */
    public static void subScrbe() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                boolean isflag = MqttManager.getInstance().subScribe(Mqtt.Mqtt_TopDevice + SharedPreferencesUtil.getString("device_id"), 2);
                if (!isflag) {
                    LogUtils.e(TAG, "訂閱失敗" + SharedPreferencesUtil.getString("device_id"));
                    Log.e(TAG, "訂閱失敗" + SharedPreferencesUtil.getString("device_id"));
                } else {
                    LogUtils.e(TAG, "訂閱成功" + SharedPreferencesUtil.getString("device_id"));
                    Log.e(TAG, "訂閱成功" + SharedPreferencesUtil.getString("device_id"));

                    /**
                     * 傳送心跳包
                     * 心跳包內容是獲取到的裝置狀態
                     * */
//                    if (hasReConnection) {
                    sendHbPacket();
//                    }
                }
            }
        }).start();
    }

    private static Timer uploadStatusTimer = null;

    /**
     * 傳送心跳包
     */
    private static void sendHbPacket() {
        if (uploadStatusTimer == null) {
            uploadStatusTimer = new Timer();
            uploadStatusTimer.schedule(new TimerTask() {
                @Override
                public void run() {
                    String json = JsonUtil.uploadDevStatus(DatabaseUtil.getInstance());
                    if (!isBoolean) {
                        LogUtils.e(TAG, "誒呀,還沒有連線服務端...");
                    } else {
                        boolean isflag = MqttManager.getInstance().publish(Mqtt.Mqtt_TopService + SharedPreferencesUtil.getString("device_id"), 2, json.getBytes());
                        LogUtils.d("TAG", "心跳包,上報狀態=" + json.toString());
                        Log.e("TAG", "心跳包,上報狀態=" + json.toString());
                        if (!isflag) {
                            LogUtils.e(TAG, "傳送失敗");
                        }
                    }
                }
            }, 30 * 1000, 30 * 1000);
        }
    }


    /**
     * 傳送心跳包
     *//*
    public static void sendHbPacket(final String json) {
        if (!isBoolean) {
            LogUtils.e(TAG, "誒呀,還沒有連線服務端...");
            return;
        }
        new Thread(new Runnable() {
            @Override
            public void run() {
                if (json.equals("")) {
                    return;
                }
                while (isBoolean) {
                    LogUtils.e(TAG, "上報狀態" + json);
                    boolean isflag = MqttManager.getInstance().publish(Mqtt.Mqtt_TopService + SharedPreferencesUtil.getString("device_id"), 2, json.getBytes());
                    if (!isflag) {
                        LogUtils.e(TAG, "傳送失敗");
                    }
                    try {
                        Thread.sleep(30 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

    }*/

    /**
     * 向服務端傳送資料
     */
    public static boolean sendData(String json) {
        boolean isflag = false;
        if (json.equals("") || json == null) {
            isflag = false;
            return isflag;
        }
        if (isBoolean) {
            LogUtils.e(TAG, "傳送的資料為:" + json);
            Log.e(TAG, "sendData: 傳送的資料為:" + json);
            isflag = MqttManager.getInstance().publish(Mqtt.Mqtt_TopService + SharedPreferencesUtil.getString("device_id"), 2, json.getBytes());
        }
        return isflag;
    }

    /**
     * 傳送資料到服務端
     */
    public boolean publish(String topicName, int qos, byte[] payload) {
        boolean flag = false;
        if (client != null && client.isConnected()) {
            MqttMessage message = new MqttMessage(payload);
            message.setQos(qos);
            try {
                client.publish(topicName, message);
                flag = true;
            } catch (MqttException e) {
            }
        }
        return flag;
    }

    /**
     * 訂閱
     */
    public boolean subScribe(String topicName, int qos) {
        boolean flag = false;
        if (client != null && client.isConnected()) {
            try {
                client.subscribe(topicName, qos);
                flag = true;
//                EventBus.getDefault().post(new MqttMsgEvent("訂閱成功"));
                EventBus.getDefault().post(new MqttMsgEvent(JsonUtil.tipsInfo("訂閱成功", Mqtt.CONNECTION_SUCCESS)));
            } catch (MqttException e) {
//                EventBus.getDefault().post(new MqttMsgEvent("訂閱異常"));
                EventBus.getDefault().post(new MqttMsgEvent(JsonUtil.tipsInfo("訂閱異常" + e.toString(), Mqtt.CONNECTION_EXCEPTION)));
            }
        }
        return flag;
    }

    /**
     * 取消連線
     */
    public void disConnect() throws MqttException {
        if (client != null && client.isConnected()) {
            client.disconnect();
//            EventBus.getDefault().post(new MqttMsgEvent("斷開連線"));
            EventBus.getDefault().post(new MqttMsgEvent(JsonUtil.tipsInfo("斷開連線", Mqtt.DISCONNECT)));
        }
    }

    private MqttCallback mqttCallback = new MqttCallback() {
        /**
         * 接收服務端訊息回撥
         * */
        @Override
        public void messageArrived(String topic, MqttMessage message) {
            LogUtils.e(TAG, "Server:" + message.toString());
            EventBus.getDefault().post(new MqttMsgEvent(message.toString()));
        }

        /**
         * 與服務端連線丟失,回撥此方法,重新連線
         * */
        @Override
        public void connectionLost(Throwable cause) {
            LogUtils.e(TAG, cause.getMessage());
//            EventBus.getDefault().post(new MqttMsgEvent("與服務端連線丟失"));
            EventBus.getDefault().post(new MqttMsgEvent(JsonUtil.tipsInfo("與服務端連線丟失" + cause.toString(), Mqtt.LOST_CONNECTION)));
            isBoolean = false;
            if (uploadStatusTimer != null) {
                uploadStatusTimer.cancel();
                uploadStatusTimer = null;
            }
            Connect();
        }

        /**
         * 與訊息相關聯的送達令牌
         * 傳送訊息回撥
         * */
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {

        }
    };

    /**
     * 釋放單例, 及其所引用的資源
     */
    public static void release() throws Exception {
        if (mInstance != null) {
            mInstance.disConnect();
            mInstance = null;
        }
    }
}

 

 

 

 

 

import com.sqy.vending.mqttvending_master.util.SharedPreferencesUtil;

/**
 * Created by Administrator on 2018/6/11.
 */
public class Mqtt {

    /**
     * mqtt tcp連線地址
     */
    public static String Mqtt_Url = "";

    /**
     * 域名地址
     */
    public static String Mqtt_UserName = "";
//    public static String Mqtt_UserName = String.valueOf(System.currentTimeMillis()/1000);

    /**
     * mqtt連線埠號
     */
    public static String Mqtt_Port = "";

    /**
     * mqtt連線測試Id
     */
//    public static String Mqtt_ClientId;

    /**
     * 傳送訊息主題
     */
    public static String Mqtt_TopDevice = "";

    /**
     * 訂閱主題
     */
    public static String Mqtt_TopService = "";
    ;

    /**
     * 標記是否開啟mqtt連線
     */
    public static volatile boolean isBoolean = false;

    public static final String CONNECTION_SUCCESS = "0";
    public static final String CONNECTION_EXCEPTION = "1";
    public static final String DISCONNECT = "2";
    public static final String LOST_CONNECTION = "3";
    public static final String CONNECTION_FAIL = "4";

}