Android MQTT的訂閱和釋出訊息
Android MQTT的訂閱和釋出訊息
MQTT協議簡述
MQTT(Message Queuing Telemetry Transport,訊息佇列遙測傳輸協議),是基於釋出/訂閱(Publish/Subscribe)模式的"輕量級"通訊協議,該協議構建於TCP/IP協議上,有IBM在1999年釋出.MQTT最大的優點在於:可以以極少的程式碼和有限的寬頻,為連線遠端裝置提供可靠的訊息服務.。作為一種開銷、低寬頻佔用的即時通訊協議,在使其物聯網、小型裝置、移動應用等方面較廣泛的應用。
MQTTMQTT是一個基於客戶端-伺服器的訊息釋出/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用範圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通訊和物聯網(IoT)。其在,通過衛星鏈路通訊感測器、偶爾撥號的醫療裝置、智慧家居、及一些小型化裝置中已廣泛使用。
特點
1、實現簡單
2、提供資料傳輸的Qos
3、輕量、佔用寬頻低
4、有三種訊息釋出服務質量
- "至多一次",訊息釋出完成依賴底層TCP/IP網路。會發生訊息丟失或重複。這一級可用於如下情況,壞境感測器資料,丟失一次讀記錄無所謂,因為不久後還會有第二次傳送
- "至少一次",確保訊息到達,但訊息重複可能會發生。
- "只有一次",確保訊息到達一次,這一級別可用於如下情況,在計費系統中,訊息重複或丟失會導致不正確的結果。
5、可傳輸任意型別的資料
6、可保持的會話(session)
MQTT協議原理
1 MQTT協議實現方式
實現MQTT協議需要客戶端和伺服器端通訊完成,在通訊過程中,MQTT協議中有三種身份:釋出者(Publish)、代理(Broker)(伺服器)、訂閱者(Subscribe)。其中,訊息的釋出者和訂閱者都是客戶端,訊息代理是伺服器,訊息釋出者可以同時是訂閱者。
MQTT傳輸的訊息分為:主題(Topic)和負載(payload)兩部分:
- (1)Topic,可以理解為訊息的型別,訂閱者訂閱(Subscribe)後,就會收到該主題的訊息內容(payload);
- (2)payload,可以理解為訊息的內容,是指訂閱者具體要使用的內容。
2 網路傳輸與應用訊息
MQTT會構建底層網路傳輸:它將建立客戶端到伺服器的連線,提供兩者之間的一個有序的、無損的、基於位元組流的雙向傳輸。
當應用資料通過MQTT網路傳送時,MQTT會把與之相關的服務質量(QoS)和主題名(Topic)相關連。
3 MQTT客戶端
一個使用MQTT協議的應用程式或者裝置,它總是建立到伺服器的網路連線。客戶端可以:
- (1)釋出其他客戶端可能會訂閱的資訊;
- (2)訂閱其它客戶端釋出的訊息;
- (3)退訂或刪除應用程式的訊息;
- (4)斷開與伺服器連線。
4 MQTT伺服器
MQTT伺服器以稱為"訊息代理"(Broker),可以是一個應用程式或一臺裝置。它是位於訊息釋出者和訂閱者之間,它可以:
- (1)接受來自客戶的網路連線;
- (2)接受客戶釋出的應用資訊;
- (3)處理來自客戶端的訂閱和退訂請求;
- (4)向訂閱的客戶轉發應用程式訊息。
5 MQTT協議中的訂閱、主題、會話
一、訂閱(Subscription)
訂閱包含主題篩選器(Topic Filter)和最大服務質量(QoS)。訂閱會與一個會話(Session)關聯。一個會話可以包含多個訂閱。每一個會話中的每個訂閱都有一個不同的主題篩選器。
二、會話(Session)
每個客戶端與伺服器建立連線後就是一個會話,客戶端和伺服器之間有狀態互動。會話存在於一個網路之間,也可能在客戶端和伺服器之間跨越多個連續的網路連線。
三、主題名(Topic Name)
連線到一個應用程式訊息的標籤,該標籤與伺服器的訂閱相匹配。伺服器會將訊息傳送給訂閱所匹配標籤的每個客戶端。
四、主題篩選器(Topic Filter)
一個對主題名萬用字元篩選器,在訂閱表示式中使用,表示訂閱所匹配到的多個主題。
五、負載(Payload)
訊息訂閱者所具體接收的內容。
6 MQTT協議中的方法
MQTT協議中定義了一些方法(也被稱為動作),來於表示對確定資源所進行操作。這個資源可以代表預先存在的資料或動態生成資料,這取決於伺服器的實現。通常來說,資源指伺服器上的檔案或輸出。主要方法有:
- (1)Connect。等待與伺服器建立連線。
- (2)Disconnect。等待MQTT客戶端完成所做的工作,並與伺服器斷開TCP/IP會話。
- (3)Subscribe。等待完成訂閱。
- (4)UnSubscribe。等待伺服器取消客戶端的一個或多個topics訂閱。
- (5)Publish。MQTT客戶端傳送訊息請求,傳送完成後返回應用程式執行緒。
Android 下如何使用MQTT協議
- 匯入mqtt包
- 配置MqttConnectOptions
- 呼叫connect並將配置好的引數寫入
- 通過指定的訊息進行訊息訂閱
- 向訂閱topic對中釋出訊息
- 通過mqttCallBack的回撥對接收到的訊息進行處理
匯入mqtt包
1 implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' 2 implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
程式碼
1 public class MQTTService extends Service { 2 3 public static final String TAG = MQTTService.class.getSimpleName(); 4 5 private static MqttAndroidClient client; 6 private MqttConnectOptions conOpt; 7 8 private String host = "tcp://xxx"; //伺服器地址 9 private String userName = "xxx"; //賬號 10 private String passWord = " xxx"; //密碼 11 private static String myTopic = "topic"; //頻道名 12 private String clientId = "mqtt_client"; //客戶端ID 13 14 @Override 15 public int onStartCommand(Intent intent, int flags, int startId) { 16 init(); 17 return super.onStartCommand(intent, flags, startId); 18 } 19 //釋出訊息 msg 20 public static void publish(String msg){ 21 String topic = myTopic; 22 Integer qos = 2; 23 Boolean retained = false; 24 try { 25 client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue()); 26 } catch (MqttException e) { 27 e.printStackTrace(); 28 } 29 } 30 31 private void init() { 32 // 伺服器地址(協議+地址+埠號) 33 String uri = host; 34 client = new MqttAndroidClient(this, uri, clientId); 35 // 設定MQTT監聽並且接受訊息 36 client.setCallback(mqttCallback); 37 38 conOpt = new MqttConnectOptions(); 39 // 清除快取 40 conOpt.setCleanSession(true); 41 // 設定超時時間,單位:秒 42 conOpt.setConnectionTimeout(10); 43 // 心跳包傳送間隔,單位:秒 44 conOpt.setKeepAliveInterval(20); 45 // 使用者名稱 46 conOpt.setUserName(userName); 47 // 密碼 48 conOpt.setPassword(passWord.toCharArray()); 49 50 // last will message 51 boolean doConnect = true; 52 String message = "{\"terminal_uid\":\"" + clientId + "\"}"; 53 String topic = myTopic; 54 Integer qos = 2; 55 Boolean retained = false; 56 if ((!message.equals("")) || (!topic.equals(""))) { 57 // 最後 58 try { 59 conOpt.setWill(topic, message.getBytes(), qos.intValue(), retained.booleanValue()); 60 } catch (Exception e) { 61 Log.i(TAG, "Exception Occured", e); 62 doConnect = false; 63 iMqttActionListener.onFailure(null, e); 64 } 65 } 66 67 if (doConnect) { 68 doClientConnection(); 69 } 70 71 } 72 73 @Override 74 public void onDestroy() { 75 try { 76 client.disconnect(); //服務銷燬,斷開連線 77 } catch (MqttException e) { 78 e.printStackTrace(); 79 } 80 super.onDestroy(); 81 } 82 83 /** 連線MQTT伺服器 */ 84 private void doClientConnection() { 85 if (!client.isConnected() && isConnectIsNomarl()) { 86 try { 87 client.connect(conOpt, null, iMqttActionListener); 88 } catch (MqttException e) { 89 e.printStackTrace(); 90 } 91 } 92 93 } 94 95 // MQTT是否連線成功 96 private IMqttActionListener iMqttActionListener = new IMqttActionListener() { 97 98 @Override 99 public void onSuccess(IMqttToken arg0) { 100 Log.i(TAG, "連線成功 "); 101 try { 102 // 訂閱myTopic話題,當訂閱多條頻道,需要遍歷逐條訂閱,否則有可能訂閱失敗 103 client.subscribe(myTopic,1); 104 } catch (MqttException e) { 105 e.printStackTrace(); 106 } 107 } 108 109 @Override 110 public void onFailure(IMqttToken arg0, Throwable arg1) { 111 arg1.printStackTrace(); 112 // 連線失敗,重連 113 } 114 }; 115 116 // MQTT監聽並且接受訊息 117 private MqttCallback mqttCallback = new MqttCallback() { 118 119 @Override 120 public void messageArrived(String topic, MqttMessage message) throws Exception { 121 122 String str1 = new String(message.getPayload()); 123 MQTTMessage msg = new MQTTMessage(); //自定義介面 124 msg.setMessage(str1); 125 //訂閱資訊,接收的資訊message 126 String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained(); 127 Log.i(TAG, "messageArrived:" + str1); 128 Log.i(TAG, str2); 129 } 130 131 @Override 132 public void deliveryComplete(IMqttDeliveryToken arg0) { 133 134 } 135 136 @Override 137 public void connectionLost(Throwable arg0) { 138 // 失去連線,重連 139 } 140 }; 141 142 /** 判斷網路是否連線 */ 143 private boolean isConnectIsNomarl() { 144 ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE); 145 NetworkInfo info = connectivityManager.getActiveNetworkInfo(); 146 if (info != null && info.isAvailable()) { 147 String name = info.getTypeName(); 148 Log.i(TAG, "MQTT當前網路名稱:" + name); 149 return true; 150 } else { 151 Log.i(TAG, "MQTT 沒有可用網路"); 152 return false; 153 } 154 } 155 156 @Nullable 157 @Override 158 public IBinder onBind(Intent intent) { 159 return null; 160 } 161 }
連線成功後,就可以實現和服務端訊息的傳送和接收。
最後在AndroidManifest.xml檔案
註冊Services
1 <service android:name="org.eclipse.paho.android.service.MqttService" /> <!--匯入包,Android自帶Mqtt的服務,需要註冊--> 2 <service android:name=".mqtttest.MQService"/> <!--自己開啟的服務-->
許可權
1 <uses-permission android:name="android.permission.WAKE_LOCK" /> 2 <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" /> 3 <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" /> 4 <uses-permission android:name="android.permission.READ_PHONE_STATE" /> 5 <uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE" /> 6 <uses-permission android:name="android.permission.INTERNET" />