Android中MQTT協議的使用
歡迎在我的公眾號aserbao給我留言,無償服務!同時,歡迎大家來加入微信群二維碼討論群,一起討論Android開發技術!群二維碼定時在我公眾號更新!
文章目錄
前言
專案中有用到mqtt,碰巧沒人負責這一塊,所以毛遂自薦就看了一波,下面是一些簡單的使用記錄,寫得不好,僅供參考。若沒有mqtt伺服器的朋友,建議先建一個mqtt服務,不然看不到效果。
什麼是Mqtt?
MQTT 的全稱為 Message Queue Telemetry Transport,是輕量級基於代理的釋出/訂閱的訊息傳輸協議,它可以通過很少的程式碼和頻寬和遠端裝置連線。例如通過衛星和代理連線,通過撥號和醫療保健提供者連線,以及在一些自動化或小型裝置上,而且由於小巧,省電,協議開銷小和能高效的向一和多個接收者傳遞資訊,故同樣適用於稱動應用裝置上。MQTT就包含了以下一些特點:
- 實現簡單
- 提供資料傳輸的 QoS
- 輕量、佔用頻寬低
- 可傳輸任意型別的資料
- 可保持的會話(session)
Android 下如何使用Mqtt?
在Android中使用Mqtt可以分為6個步驟:
- 匯入mqtt包;
- 配置MqttConnectOptions;
- 呼叫connect並將配置好的引數寫入;
- 通過指定的訊息進行訊息訂閱;
- 向訂閱的topic中釋出訊息;
- 通過mqttCallBack的回撥對接收到的訊息進行處理;
// mqtt 包匯入 implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
匯入類:
public class MQTTManager { private static final String TAG = "MQTTManager"; public static final String SERVER_HOST = "tcp://52.80.116.245:1883"; private String clientid = "2df8aabfb8b6088953664f413a446bbc"; private static MQTTManager mqttManager=null; private MqttClient client; private MqttConnectOptions options; private Context mContext; private MessageHandlerCallBack callBack; private MQTTManager(Context context){ mContext = context; clientid+=MqttClient.generateClientId(); } /** * 獲取一個MQTTManager單例 * @param context * @return 返回一個MQTTManager的例項物件 */ public static MQTTManager getInstance(Context context) { Log.d(TAG,"mqttManager="+mqttManager); if (mqttManager==null) { mqttManager=new MQTTManager(context); synchronized (Object.class) { Log.d(TAG,"synchronized mqttManager="+mqttManager); if (mqttManager!=null) { return mqttManager; } } }else { Log.d(TAG,"else mqttManager="+mqttManager); return mqttManager; } return null; } /** * 連線伺服器 */ public void connect(){ Log.d(TAG,"開始連線MQtt"); try { // host為主機名,clientid即連線MQTT的客戶端ID,一般以唯一識別符號表示,MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存 client = new MqttClient(SERVER_HOST, "2df8aabfb8b6088953664f413a446bbc", new MemoryPersistence()); // MQTT的連線設定 options = new MqttConnectOptions(); // 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線 options.setCleanSession(true); // 設定連線的使用者名稱 options.setUserName("7302"); // 設定連線的密碼 options.setPassword("64ec6f32366ccb80f0dacc804546d62e623e4b72".toCharArray()); // 設定超時時間 單位為秒 options.setConnectionTimeout(30); // 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制 options.setKeepAliveInterval(30); // 設定回撥 // MqttTopic topic = client.getTopic(TOPIC); //setWill方法,如果專案中需要知道客戶端是否掉線可以呼叫該方法。設定最終埠的通知訊息 // options.setWill(topic, "close".getBytes(), 2, true); SSLSocketFactory sslSocketFactory = null; /* try { sslSocketFactory = sslContextFromStream(mContext.getAssets().open("server.pem")).getSocketFactory(); } catch (Exception e) { e.printStackTrace(); } options.setSocketFactory(sslSocketFactory);*/ client.setCallback(new PushCallback()); client.connect(options); Log.d(TAG,"ClientId="+client.getClientId()); } catch (MqttException e) { e.printStackTrace(); Log.e(TAG, "connect: " + e ); } } /** * 訂閱訊息 * @param topic 訂閱訊息的主題 */ public void subscribeMsg(String topic,int qos){ if (client!=null) { int[] Qos = {qos}; String[] topic1 = {topic}; try { client.subscribe(topic1, Qos); Log.d(TAG,"開始訂閱topic="+topic); } catch (MqttException e) { e.printStackTrace(); } } } /** * 釋出訊息 * @param topic 釋出訊息主題 * @param msg 訊息體 * @param isRetained 是否為保留訊息 */ public void publish(String topic,String msg,boolean isRetained,int qos) { try { if (client!=null) { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(isRetained); message.setPayload(msg.getBytes()); client.publish(topic, message); Log.d(TAG,"topic="+topic+"--msg="+msg+"--isRetained"+isRetained); } } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } int count=0; /** * 釋出和訂閱訊息的回撥 * */ public class PushCallback implements MqttCallback { public void connectionLost(Throwable cause) { Log.e(TAG, "connectionLost: " + cause ); if (count<5) { count++;//5次重連 Log.d(TAG,"斷開連線,重新連線"+count+"次"+cause); try { client.close(); connect(); } catch (MqttException e) { e.printStackTrace(); } } } /** * 釋出訊息的回撥 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { //publish後會執行到這裡 Log.d(TAG,"釋出訊息成功的回撥"+token.isComplete()); } /** * 接收訊息的回撥方法 */ @Override public void messageArrived(final String topicName, final MqttMessage message) throws Exception { //subscribe後得到的訊息會執行到這裡面 Log.d(TAG,"接收訊息=="+new String(message.getPayload())); if (callBack!=null) { callBack.messageSuccess(topicName,new String(message.getPayload())); } } } /** * 設定接收訊息的回撥方法 * @param callBack */ public void setMessageHandlerCallBack(MessageHandlerCallBack callBack){ this.callBack = callBack; } public MessageHandlerCallBack getMessageHandlerCallBack(){ if (callBack!=null) { return callBack; } return null; } /** * 斷開連結 */ public void disconnect(){ if (client!=null&&client.isConnected()) { try { client.disconnect(); mqttManager=null; } catch (MqttException e) { e.printStackTrace(); } } } /** * 釋放資源 */ public void release(){ if (mqttManager!=null) { mqttManager=null; } } /** * 判斷服務是否連線 * @return */ public boolean isConnected(){ if (client!=null) { return client.isConnected(); } return false; } public SSLContext sslContextFromStream(InputStream inputStream) throws Exception { CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509"); Certificate certificate = certificateFactory.generateCertificate(inputStream); KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); keyStore.load(null, null); keyStore.setCertificateEntry("ca", certificate); TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); trustManagerFactory.init(keyStore); SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(null, trustManagerFactory.getTrustManagers(), null); return sslContext; } }
呼叫:
MQTTManager instance = MQTTManager.getInstance(mContext);
instance.setMessageHandlerCallBack(new MessageHandlerCallBack() {
@Override
public void messageSuccess(String topicName, String s) {
Log.e(TAG, "messageSuccess: " + s);
}
});
instance.connect();
連線成功後,就可以實現和服務端訊息的傳送和接收。
專案地址
AserbaosAndroid
aserbao的個人Android總結專案,希望這個專案能成為最全面的Android開發學習專案,這是個美好的願景,專案中還有很多未涉及到的地方,有很多沒有講到的點,希望看到這個專案的朋友,如果你在開發中遇到什麼問題,在這個專案中沒有找到對應的解決辦法,希望你能夠提出來,給我留言或者在專案github地址提issues,我有時間就會更新專案沒有涉及到的部分!專案會一直維護下去。當然,我希望是Aserbao’sAndroid 能為所有Android開發者提供到幫助!也期望更多Android開發者能參與進來,只要你熟悉Android某一塊,都可以將你的程式碼pull上分支供大家學習!
總結
mqtt Android客戶端的程式碼很有限,沒有過多的操作!簡單的配置,連線,然後接受回撥處理接受訊息就可以了,協議內容今後有時間再細看吧!