Android 物聯網常用網路框架Mqtt
import android.content.Intent;
import android.util.Log;
import com.sqy.vending.mqttvending_master.app.MyApplication;
import com.sqy.vending.mqttvending_master.bean.MqttMsgEvent;
import com.sqy.vending.mqttvending_master.db.DatabaseUtil;
import com.sqy.vending.mqttvending_master.util.JsonUtil;
import com.sqy.vending.mqttvending_master.util.LogUtils;
import com.sqy.vending.mqttvending_master.util.SharedPreferencesUtil;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.greenrobot.eventbus.EventBus;
import org.json.JSONException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static com.sqy.vending.mqttvending_master.manager.Mqtt.isBoolean;
/**
* Created by Administrator on 2018/6/7.
*/
public class MqttManager {
public static String TAG = MqttManager.class.getSimpleName();
public static MqttManager mInstance = null;
public MqttClient client;
public MqttConnectOptions conOpt;
private ScheduledExecutorService scheduler;
private boolean clean = true;
private static volatile boolean hasReConnection = false;
// private ThreadPoolExecutor executor = new ThreadPoolExecutor();//使用執行緒池
public static MqttManager getInstance() {
if (null == mInstance) {
mInstance = new MqttManager();
}
return mInstance;
}
/**
* 建立 Mqtt 連線
*/
public boolean createConnect(String brokerUrl, String userName, String password, String clientId) {
boolean flag = false;
String tmpDir = System.getProperty("java.io.tmpdir");
MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
Log.e(TAG, "createConnect: brokerUrl:" + brokerUrl + " userName:" + userName + " password:" + password + " clientId:" + clientId);
try {
conOpt = new MqttConnectOptions();
conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
conOpt.setCleanSession(clean);
conOpt.setPassword(password.toCharArray());
conOpt.setUserName(userName);
conOpt.setConnectionTimeout(10); //設定連線超時
conOpt.setKeepAliveInterval(8); //設定心跳時長 10*1.5
client = new MqttClient(brokerUrl, clientId, dataStore);
client.setCallback(mqttCallback);
if (client != null) {
try {
client.connect(conOpt);
flag = true;
} catch (Exception e) {
flag = false;
}
}
} catch (MqttException e) {
Log.e(TAG, "createConnect:MqttException: " + e.toString());
}
return flag;
}
/**
* 連線服務端
*/
public void Connect() {
new Thread(new Runnable() {
@Override
public void run() {
boolean isflag = MqttManager.getInstance().createConnect(Mqtt.Mqtt_Url, Mqtt.Mqtt_UserName, Mqtt.Mqtt_Port, SharedPreferencesUtil.getString("device_id"));
if (isflag) {
subScrbe();
Mqtt.isBoolean = true;
LogUtils.e(TAG, "連線成功...");
Log.e(TAG, "連線成功...");
} else {
LogUtils.e(TAG, "連線失敗...");
Log.e(TAG, "連線失敗...");
EventBus.getDefault().post(new MqttMsgEvent(JsonUtil.tipsInfo("連線失敗", Mqtt.CONNECTION_FAIL)));
}
startReconnect();
}
}).start();
}
/**
* 定時器,每隔10秒檢查一次mqtt是否連線
*/
private void startReconnect() {
Log.e("RE", "startReconnect: 重連::");
if (scheduler == null) {
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (!MqttManager.getInstance().client.isConnected()) {
LogUtils.e(TAG, "定時檢查到mqtt連線斷開...");
Log.e("RE", "定時檢查到mqtt連線斷開...");
isBoolean = false;
hasReConnection = true;
Connect();
}
}
}, 10 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
}
}
/**
* 訂閱
*/
public static void subScrbe() {
new Thread(new Runnable() {
@Override
public void run() {
boolean isflag = MqttManager.getInstance().subScribe(Mqtt.Mqtt_TopDevice + SharedPreferencesUtil.getString("device_id"), 2);
if (!isflag) {
LogUtils.e(TAG, "訂閱失敗" + SharedPreferencesUtil.getString("device_id"));
Log.e(TAG, "訂閱失敗" + SharedPreferencesUtil.getString("device_id"));
} else {
LogUtils.e(TAG, "訂閱成功" + SharedPreferencesUtil.getString("device_id"));
Log.e(TAG, "訂閱成功" + SharedPreferencesUtil.getString("device_id"));
/**
* 傳送心跳包
* 心跳包內容是獲取到的裝置狀態
* */
// if (hasReConnection) {
sendHbPacket();
// }
}
}
}).start();
}
private static Timer uploadStatusTimer = null;
/**
* 傳送心跳包
*/
private static void sendHbPacket() {
if (uploadStatusTimer == null) {
uploadStatusTimer = new Timer();
uploadStatusTimer.schedule(new TimerTask() {
@Override
public void run() {
String json = JsonUtil.uploadDevStatus(DatabaseUtil.getInstance());
if (!isBoolean) {
LogUtils.e(TAG, "誒呀,還沒有連線服務端...");
} else {
boolean isflag = MqttManager.getInstance().publish(Mqtt.Mqtt_TopService + SharedPreferencesUtil.getString("device_id"), 2, json.getBytes());
LogUtils.d("TAG", "心跳包,上報狀態=" + json.toString());
Log.e("TAG", "心跳包,上報狀態=" + json.toString());
if (!isflag) {
LogUtils.e(TAG, "傳送失敗");
}
}
}
}, 30 * 1000, 30 * 1000);
}
}
/**
* 傳送心跳包
*//*
public static void sendHbPacket(final String json) {
if (!isBoolean) {
LogUtils.e(TAG, "誒呀,還沒有連線服務端...");
return;
}
new Thread(new Runnable() {
@Override
public void run() {
if (json.equals("")) {
return;
}
while (isBoolean) {
LogUtils.e(TAG, "上報狀態" + json);
boolean isflag = MqttManager.getInstance().publish(Mqtt.Mqtt_TopService + SharedPreferencesUtil.getString("device_id"), 2, json.getBytes());
if (!isflag) {
LogUtils.e(TAG, "傳送失敗");
}
try {
Thread.sleep(30 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}*/
/**
* 向服務端傳送資料
*/
public static boolean sendData(String json) {
boolean isflag = false;
if (json.equals("") || json == null) {
isflag = false;
return isflag;
}
if (isBoolean) {
LogUtils.e(TAG, "傳送的資料為:" + json);
Log.e(TAG, "sendData: 傳送的資料為:" + json);
isflag = MqttManager.getInstance().publish(Mqtt.Mqtt_TopService + SharedPreferencesUtil.getString("device_id"), 2, json.getBytes());
}
return isflag;
}
/**
* 傳送資料到服務端
*/
public boolean publish(String topicName, int qos, byte[] payload) {
boolean flag = false;
if (client != null && client.isConnected()) {
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
try {
client.publish(topicName, message);
flag = true;
} catch (MqttException e) {
}
}
return flag;
}
/**
* 訂閱
*/
public boolean subScribe(String topicName, int qos) {
boolean flag = false;
if (client != null && client.isConnected()) {
try {
client.subscribe(topicName, qos);
flag = true;
// EventBus.getDefault().post(new MqttMsgEvent("訂閱成功"));
EventBus.getDefault().post(new MqttMsgEvent(JsonUtil.tipsInfo("訂閱成功", Mqtt.CONNECTION_SUCCESS)));
} catch (MqttException e) {
// EventBus.getDefault().post(new MqttMsgEvent("訂閱異常"));
EventBus.getDefault().post(new MqttMsgEvent(JsonUtil.tipsInfo("訂閱異常" + e.toString(), Mqtt.CONNECTION_EXCEPTION)));
}
}
return flag;
}
/**
* 取消連線
*/
public void disConnect() throws MqttException {
if (client != null && client.isConnected()) {
client.disconnect();
// EventBus.getDefault().post(new MqttMsgEvent("斷開連線"));
EventBus.getDefault().post(new MqttMsgEvent(JsonUtil.tipsInfo("斷開連線", Mqtt.DISCONNECT)));
}
}
private MqttCallback mqttCallback = new MqttCallback() {
/**
* 接收服務端訊息回撥
* */
@Override
public void messageArrived(String topic, MqttMessage message) {
LogUtils.e(TAG, "Server:" + message.toString());
EventBus.getDefault().post(new MqttMsgEvent(message.toString()));
}
/**
* 與服務端連線丟失,回撥此方法,重新連線
* */
@Override
public void connectionLost(Throwable cause) {
LogUtils.e(TAG, cause.getMessage());
// EventBus.getDefault().post(new MqttMsgEvent("與服務端連線丟失"));
EventBus.getDefault().post(new MqttMsgEvent(JsonUtil.tipsInfo("與服務端連線丟失" + cause.toString(), Mqtt.LOST_CONNECTION)));
isBoolean = false;
if (uploadStatusTimer != null) {
uploadStatusTimer.cancel();
uploadStatusTimer = null;
}
Connect();
}
/**
* 與訊息相關聯的送達令牌
* 傳送訊息回撥
* */
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
};
/**
* 釋放單例, 及其所引用的資源
*/
public static void release() throws Exception {
if (mInstance != null) {
mInstance.disConnect();
mInstance = null;
}
}
}
import com.sqy.vending.mqttvending_master.util.SharedPreferencesUtil;
/**
* Created by Administrator on 2018/6/11.
*/
public class Mqtt {
/**
* mqtt tcp連線地址
*/
public static String Mqtt_Url = "";
/**
* 域名地址
*/
public static String Mqtt_UserName = "";
// public static String Mqtt_UserName = String.valueOf(System.currentTimeMillis()/1000);
/**
* mqtt連線埠號
*/
public static String Mqtt_Port = "";
/**
* mqtt連線測試Id
*/
// public static String Mqtt_ClientId;
/**
* 傳送訊息主題
*/
public static String Mqtt_TopDevice = "";
/**
* 訂閱主題
*/
public static String Mqtt_TopService = "";
;
/**
* 標記是否開啟mqtt連線
*/
public static volatile boolean isBoolean = false;
public static final String CONNECTION_SUCCESS = "0";
public static final String CONNECTION_EXCEPTION = "1";
public static final String DISCONNECT = "2";
public static final String LOST_CONNECTION = "3";
public static final String CONNECTION_FAIL = "4";
}