1. 程式人生 > 實用技巧 >Android MQTT的訂閱和釋出訊息

Android MQTT的訂閱和釋出訊息

Android MQTT的訂閱和釋出訊息

MQTT協議簡述

MQTT(Message Queuing Telemetry Transport,訊息佇列遙測傳輸協議),是基於釋出/訂閱(Publish/Subscribe)模式的"輕量級"通訊協議,該協議構建於TCP/IP協議上,有IBM在1999年釋出.MQTT最大的優點在於:可以以極少的程式碼和有限的寬頻,為連線遠端裝置提供可靠的訊息服務.。作為一種開銷、低寬頻佔用的即時通訊協議,在使其物聯網、小型裝置、移動應用等方面較廣泛的應用。

MQTTMQTT是一個基於客戶端-伺服器的訊息釋出/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用範圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通訊和物聯網(IoT)。其在,通過衛星鏈路通訊感測器、偶爾撥號的醫療裝置、智慧家居、及一些小型化裝置中已廣泛使用。

特點

1、實現簡單

2、提供資料傳輸的Qos

3、輕量、佔用寬頻低

4、有三種訊息釋出服務質量

  • "至多一次",訊息釋出完成依賴底層TCP/IP網路。會發生訊息丟失或重複。這一級可用於如下情況,壞境感測器資料,丟失一次讀記錄無所謂,因為不久後還會有第二次傳送
  • "至少一次",確保訊息到達,但訊息重複可能會發生。
  • "只有一次",確保訊息到達一次,這一級別可用於如下情況,在計費系統中,訊息重複或丟失會導致不正確的結果。

5、可傳輸任意型別的資料

6、可保持的會話(session)

MQTT協議原理

1 MQTT協議實現方式

實現MQTT協議需要客戶端和伺服器端通訊完成,在通訊過程中,MQTT協議中有三種身份:釋出者(Publish)、代理(Broker)(伺服器)、訂閱者(Subscribe)。其中,訊息的釋出者和訂閱者都是客戶端,訊息代理是伺服器,訊息釋出者可以同時是訂閱者。

MQTT傳輸的訊息分為:主題(Topic)和負載(payload)兩部分:

  • (1)Topic,可以理解為訊息的型別,訂閱者訂閱(Subscribe)後,就會收到該主題的訊息內容(payload);
  • (2)payload,可以理解為訊息的內容,是指訂閱者具體要使用的內容。

2 網路傳輸與應用訊息

MQTT會構建底層網路傳輸:它將建立客戶端到伺服器的連線,提供兩者之間的一個有序的、無損的、基於位元組流的雙向傳輸。

當應用資料通過MQTT網路傳送時,MQTT會把與之相關的服務質量(QoS)和主題名(Topic)相關連。

3 MQTT客戶端

一個使用MQTT協議的應用程式或者裝置,它總是建立到伺服器的網路連線。客戶端可以:

  • (1)釋出其他客戶端可能會訂閱的資訊;
  • (2)訂閱其它客戶端釋出的訊息;
  • (3)退訂或刪除應用程式的訊息;
  • (4)斷開與伺服器連線。

4 MQTT伺服器

MQTT伺服器以稱為"訊息代理"(Broker),可以是一個應用程式或一臺裝置。它是位於訊息釋出者和訂閱者之間,它可以:

  • (1)接受來自客戶的網路連線;
  • (2)接受客戶釋出的應用資訊;
  • (3)處理來自客戶端的訂閱和退訂請求;
  • (4)向訂閱的客戶轉發應用程式訊息。

5 MQTT協議中的訂閱、主題、會話

一、訂閱(Subscription)

訂閱包含主題篩選器(Topic Filter)和最大服務質量(QoS)。訂閱會與一個會話(Session)關聯。一個會話可以包含多個訂閱。每一個會話中的每個訂閱都有一個不同的主題篩選器。

二、會話(Session)

每個客戶端與伺服器建立連線後就是一個會話,客戶端和伺服器之間有狀態互動。會話存在於一個網路之間,也可能在客戶端和伺服器之間跨越多個連續的網路連線。

三、主題名(Topic Name)

連線到一個應用程式訊息的標籤,該標籤與伺服器的訂閱相匹配。伺服器會將訊息傳送給訂閱所匹配標籤的每個客戶端。

四、主題篩選器(Topic Filter)

一個對主題名萬用字元篩選器,在訂閱表示式中使用,表示訂閱所匹配到的多個主題。

五、負載(Payload)

訊息訂閱者所具體接收的內容。

6 MQTT協議中的方法

MQTT協議中定義了一些方法(也被稱為動作),來於表示對確定資源所進行操作。這個資源可以代表預先存在的資料或動態生成資料,這取決於伺服器的實現。通常來說,資源指伺服器上的檔案或輸出。主要方法有:

  • (1)Connect。等待與伺服器建立連線。
  • (2)Disconnect。等待MQTT客戶端完成所做的工作,並與伺服器斷開TCP/IP會話。
  • (3)Subscribe。等待完成訂閱。
  • (4)UnSubscribe。等待伺服器取消客戶端的一個或多個topics訂閱。
  • (5)Publish。MQTT客戶端傳送訊息請求,傳送完成後返回應用程式執行緒。

Android 下如何使用MQTT協議

  • 匯入mqtt包
  • 配置MqttConnectOptions
  • 呼叫connect並將配置好的引數寫入
  • 通過指定的訊息進行訊息訂閱
  • 向訂閱topic對中釋出訊息
  • 通過mqttCallBack的回撥對接收到的訊息進行處理

匯入mqtt包

1 implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' 
2 implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

程式碼

  1 public class MQTTService extends Service {
  2 
  3     public static final String TAG = MQTTService.class.getSimpleName();
  4 
  5     private static MqttAndroidClient client;
  6     private MqttConnectOptions conOpt;
  7 
  8     private String host = "tcp://xxx";   //伺服器地址 
  9     private String userName = "xxx";   //賬號
 10     private String passWord = " xxx";   //密碼
 11     private static String myTopic = "topic";   //頻道名
 12     private String clientId = "mqtt_client";   //客戶端ID
 13 
 14     @Override
 15     public int onStartCommand(Intent intent, int flags, int startId) {
 16         init();
 17         return super.onStartCommand(intent, flags, startId);
 18     }
 19   //釋出訊息 msg
 20     public static void publish(String msg){
 21         String topic = myTopic;
 22         Integer qos = 2;
 23         Boolean retained = false;
 24         try {
 25             client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue());
 26         } catch (MqttException e) {
 27             e.printStackTrace();
 28         }
 29     }
 30 
 31     private void init() {
 32         // 伺服器地址(協議+地址+埠號)
 33         String uri = host;
 34         client = new MqttAndroidClient(this, uri, clientId);
 35         // 設定MQTT監聽並且接受訊息
 36         client.setCallback(mqttCallback);
 37 
 38         conOpt = new MqttConnectOptions();
 39         // 清除快取
 40         conOpt.setCleanSession(true);
 41         // 設定超時時間,單位:秒
 42         conOpt.setConnectionTimeout(10);
 43         // 心跳包傳送間隔,單位:秒
 44         conOpt.setKeepAliveInterval(20);
 45         // 使用者名稱
 46         conOpt.setUserName(userName);
 47         // 密碼
 48         conOpt.setPassword(passWord.toCharArray());
 49 
 50         // last will message
 51         boolean doConnect = true;
 52         String message = "{\"terminal_uid\":\"" + clientId + "\"}";
 53         String topic = myTopic;
 54         Integer qos = 2;
 55         Boolean retained = false;
 56         if ((!message.equals("")) || (!topic.equals(""))) {
 57             // 最後
 58             try {
 59                 conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue());
 60             } catch (Exception e) {
 61                 Log.i(TAG, "Exception Occured", e);
 62                 doConnect = false;
 63                 iMqttActionListener.onFailure(null, e);
 64             }
 65         }
 66 
 67         if (doConnect) {
 68             doClientConnection();
 69         }
 70 
 71     }
 72 
 73     @Override
 74     public void onDestroy() {
 75         try {
 76             client.disconnect();  //服務銷燬,斷開連線
 77         } catch (MqttException e) {
 78             e.printStackTrace();
 79         }
 80         super.onDestroy();
 81     }
 82 
 83     /** 連線MQTT伺服器 */
 84     private void doClientConnection() {
 85         if (!client.isConnected() && isConnectIsNomarl()) {
 86             try {
 87                 client.connect(conOpt, null, iMqttActionListener);
 88             } catch (MqttException e) {
 89                 e.printStackTrace();
 90             }
 91         }
 92 
 93     }
 94 
 95     // MQTT是否連線成功
 96     private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
 97 
 98         @Override
 99         public void onSuccess(IMqttToken arg0) {
100             Log.i(TAG, "連線成功 ");
101             try {
102                 // 訂閱myTopic話題,當訂閱多條頻道,需要遍歷逐條訂閱,否則有可能訂閱失敗
103                 client.subscribe(myTopic,1);  
104             } catch (MqttException e) {
105                 e.printStackTrace();
106             }
107         }
108 
109         @Override
110         public void onFailure(IMqttToken arg0, Throwable arg1) {
111             arg1.printStackTrace();
112             // 連線失敗,重連
113         }
114     };
115 
116     // MQTT監聽並且接受訊息
117     private MqttCallback mqttCallback = new MqttCallback() {
118 
119         @Override
120         public void messageArrived(String topic, MqttMessage message) throws Exception {
121 
122             String str1 = new String(message.getPayload());
123             MQTTMessage msg = new MQTTMessage(); //自定義介面
124             msg.setMessage(str1);
125           //訂閱資訊,接收的資訊message
126             String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained();
127             Log.i(TAG, "messageArrived:" + str1);
128             Log.i(TAG, str2);
129         }
130 
131         @Override
132         public void deliveryComplete(IMqttDeliveryToken arg0) {
133 
134         }
135 
136         @Override
137         public void connectionLost(Throwable arg0) {
138             // 失去連線,重連
139         }
140     };
141 
142     /** 判斷網路是否連線 */
143     private boolean isConnectIsNomarl() {
144         ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
145         NetworkInfo info = connectivityManager.getActiveNetworkInfo();
146         if (info != null && info.isAvailable()) {
147             String name = info.getTypeName();
148             Log.i(TAG, "MQTT當前網路名稱:" + name);
149             return true;
150         } else {
151             Log.i(TAG, "MQTT 沒有可用網路");
152             return false;
153         }
154     }
155 
156     @Nullable
157     @Override
158     public IBinder onBind(Intent intent) {
159         return null;
160     }
161 }

 連線成功後,就可以實現和服務端訊息的傳送和接收。

最後在AndroidManifest.xml檔案

註冊Services

1 <service android:name="org.eclipse.paho.android.service.MqttService" />   <!--匯入包,Android自帶Mqtt的服務,需要註冊-->
2 <service android:name=".mqtttest.MQService"/>         <!--自己開啟的服務-->

許可權

1     <uses-permission android:name="android.permission.WAKE_LOCK" />
2     <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
3     <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
4     <uses-permission android:name="android.permission.READ_PHONE_STATE" />
5     <uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" />
6     <uses-permission android:name="android.permission.INTERNET" />

完結!