1. 程式人生 > >Android中MQTT協議的使用

Android中MQTT協議的使用

歡迎在我的公眾號aserbao給我留言,無償服務!同時,歡迎大家來加入微信群二維碼討論群,一起討論Android開發技術!群二維碼定時在我公眾號更新!
在這裡插入圖片描述

文章目錄

前言

專案中有用到mqtt,碰巧沒人負責這一塊,所以毛遂自薦就看了一波,下面是一些簡單的使用記錄,寫得不好,僅供參考。若沒有mqtt伺服器的朋友,建議先建一個mqtt服務,不然看不到效果。

什麼是Mqtt?

MQTT 的全稱為 Message Queue Telemetry Transport,是輕量級基於代理的釋出/訂閱的訊息傳輸協議,它可以通過很少的程式碼和頻寬和遠端裝置連線。例如通過衛星和代理連線,通過撥號和醫療保健提供者連線,以及在一些自動化或小型裝置上,而且由於小巧,省電,協議開銷小和能高效的向一和多個接收者傳遞資訊,故同樣適用於稱動應用裝置上。MQTT就包含了以下一些特點:

  1. 實現簡單
  2. 提供資料傳輸的 QoS
  3. 輕量、佔用頻寬低
  4. 可傳輸任意型別的資料
  5. 可保持的會話(session)

Android 下如何使用Mqtt?

在Android中使用Mqtt可以分為6個步驟:

  • 匯入mqtt包;
  • 配置MqttConnectOptions;
  • 呼叫connect並將配置好的引數寫入;
  • 通過指定的訊息進行訊息訂閱;
  • 向訂閱的topic中釋出訊息;
  • 通過mqttCallBack的回撥對接收到的訊息進行處理;
// mqtt 包匯入
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

匯入類:

public class MQTTManager {
    private static final String TAG = "MQTTManager";
        public static final String SERVER_HOST = "tcp://52.80.116.245:1883";
        private String clientid = "2df8aabfb8b6088953664f413a446bbc";
        private static MQTTManager mqttManager=null;
        private MqttClient client;
        private MqttConnectOptions options;
        private Context mContext;

        private MessageHandlerCallBack callBack;
        private MQTTManager(Context context){
            mContext = context;
            clientid+=MqttClient.generateClientId();
        }

        /**
         * 獲取一個MQTTManager單例
         * @param context
         * @return 返回一個MQTTManager的例項物件
         */
        public static MQTTManager getInstance(Context context) {
            Log.d(TAG,"mqttManager="+mqttManager);
            if (mqttManager==null) {
                mqttManager=new MQTTManager(context);
                synchronized (Object.class) {
                    Log.d(TAG,"synchronized mqttManager="+mqttManager);
                    if (mqttManager!=null) {
                        return mqttManager;
                    }
                }
            }else {
                Log.d(TAG,"else mqttManager="+mqttManager);
                return mqttManager;
            }
            return null;
        }
        /**
         * 連線伺服器
         */
        public void connect(){
            Log.d(TAG,"開始連線MQtt");
            try {
                // host為主機名,clientid即連線MQTT的客戶端ID,一般以唯一識別符號表示,MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存    
                client = new MqttClient(SERVER_HOST, "2df8aabfb8b6088953664f413a446bbc", new MemoryPersistence());
                // MQTT的連線設定    
                options = new MqttConnectOptions();
                // 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線    
              options.setCleanSession(true);
                // 設定連線的使用者名稱    
                options.setUserName("7302");
                // 設定連線的密碼    
                options.setPassword("64ec6f32366ccb80f0dacc804546d62e623e4b72".toCharArray());
                // 設定超時時間 單位為秒    
                options.setConnectionTimeout(30);
                // 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制    
                options.setKeepAliveInterval(30);
                // 設定回撥    
//              MqttTopic topic = client.getTopic(TOPIC);    
                //setWill方法,如果專案中需要知道客戶端是否掉線可以呼叫該方法。設定最終埠的通知訊息      
//              options.setWill(topic, "close".getBytes(), 2, true);
                SSLSocketFactory sslSocketFactory = null;
               /* try {
                    sslSocketFactory = sslContextFromStream(mContext.getAssets().open("server.pem")).getSocketFactory();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                options.setSocketFactory(sslSocketFactory);*/
                client.setCallback(new PushCallback());
                client.connect(options);
                Log.d(TAG,"ClientId="+client.getClientId());
            } catch (MqttException e) {
                e.printStackTrace();
                Log.e(TAG, "connect: " + e );
            }
        }

        /**
         * 訂閱訊息
         * @param topic 訂閱訊息的主題
         */
        public void subscribeMsg(String topic,int qos){
            if (client!=null) {
                int[] Qos  = {qos};
                String[] topic1 = {topic};
                try {
                    client.subscribe(topic1, Qos);
                    Log.d(TAG,"開始訂閱topic="+topic);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        }

        /**
         * 釋出訊息
         * @param topic 釋出訊息主題
         * @param msg 訊息體
         * @param isRetained 是否為保留訊息
         */
        public void publish(String topic,String msg,boolean isRetained,int qos) {

            try {
                if (client!=null) {
                    MqttMessage message = new MqttMessage();
                    message.setQos(qos);
                    message.setRetained(isRetained);
                    message.setPayload(msg.getBytes());
                    client.publish(topic, message);
                    Log.d(TAG,"topic="+topic+"--msg="+msg+"--isRetained"+isRetained);
                }
            } catch (MqttPersistenceException e) {
                e.printStackTrace();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
        int count=0;
        /**
         * 釋出和訂閱訊息的回撥
         *
         */
        public class PushCallback implements MqttCallback {

            public void connectionLost(Throwable cause) {
                Log.e(TAG, "connectionLost: " + cause );
                if (count<5) {
                    count++;//5次重連
                    Log.d(TAG,"斷開連線,重新連線"+count+"次"+cause);
                    try {
                        client.close();
                        connect();
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            }
            /**
             * 釋出訊息的回撥     
             */
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                //publish後會執行到這裡  
                Log.d(TAG,"釋出訊息成功的回撥"+token.isComplete());
            }

            /**
             * 接收訊息的回撥方法
             */
            @Override
            public void messageArrived(final String topicName, final MqttMessage message)
                    throws Exception {
                //subscribe後得到的訊息會執行到這裡面    
                Log.d(TAG,"接收訊息=="+new String(message.getPayload()));
                if (callBack!=null) {
                    callBack.messageSuccess(topicName,new String(message.getPayload()));
                }
            }

        }
        /**
         *  設定接收訊息的回撥方法
         * @param callBack
         */
        public void setMessageHandlerCallBack(MessageHandlerCallBack callBack){
            this.callBack = callBack;
        }
        public MessageHandlerCallBack getMessageHandlerCallBack(){
            if (callBack!=null) {
                return callBack;
            }
            return null;
        }
        /**
         * 斷開連結
         */
        public void disconnect(){
            if (client!=null&&client.isConnected()) {
                try {
                    client.disconnect();
                    mqttManager=null;
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        }
        /**
         * 釋放資源
         */
        public void release(){
            if (mqttManager!=null) {
                mqttManager=null;
            }
        }
        /**
         *  判斷服務是否連線
         * @return
         */
        public boolean isConnected(){
            if (client!=null) {
                return client.isConnected();
            }
            return false;
        }

    public SSLContext sslContextFromStream(InputStream inputStream) throws Exception {

        CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
        Certificate certificate = certificateFactory.generateCertificate(inputStream);

        KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
        keyStore.load(null, null);
        keyStore.setCertificateEntry("ca", certificate);

        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        trustManagerFactory.init(keyStore);

        SSLContext sslContext = SSLContext.getInstance("TLS");
        sslContext.init(null, trustManagerFactory.getTrustManagers(), null);

        return sslContext;
    }
}

呼叫:

MQTTManager instance = MQTTManager.getInstance(mContext);
                instance.setMessageHandlerCallBack(new MessageHandlerCallBack() {
                    @Override
                    public void messageSuccess(String topicName, String s) {
                        Log.e(TAG, "messageSuccess: " + s);
                    }
                });
                instance.connect();

連線成功後,就可以實現和服務端訊息的傳送和接收。

專案地址

AserbaosAndroid
aserbao的個人Android總結專案,希望這個專案能成為最全面的Android開發學習專案,這是個美好的願景,專案中還有很多未涉及到的地方,有很多沒有講到的點,希望看到這個專案的朋友,如果你在開發中遇到什麼問題,在這個專案中沒有找到對應的解決辦法,希望你能夠提出來,給我留言或者在專案github地址提issues,我有時間就會更新專案沒有涉及到的部分!專案會一直維護下去。當然,我希望是Aserbao’sAndroid 能為所有Android開發者提供到幫助!也期望更多Android開發者能參與進來,只要你熟悉Android某一塊,都可以將你的程式碼pull上分支供大家學習!

總結

mqtt Android客戶端的程式碼很有限,沒有過多的操作!簡單的配置,連線,然後接受回撥處理接受訊息就可以了,協議內容今後有時間再細看吧!

參考文章