MQTT3.1.1協議閱讀筆記1
本文主要是記錄閱讀 MQTT3.1.1協議中文版 時的心得感悟。
環境資訊
- 使用
Docker
執行emqx
,作為MQTT的服務端 - 使用
mqtt-spy.jar
作為MQTT的客戶端 - 使用
Paho
寫一個簡單的Java-MQTT客戶端 - 使用
WireShark
進行協議抓包
MQTT 簡介
MQTT(Message Queuing Telemetry Transport,訊息佇列遙測傳輸協議),是一種基於釋出/訂閱(publish/subscribe)模式的"輕量級"通訊協議,該協議構建於 TCP/IP 協議之上,由IBM在1999年釋出。。
一個 MQTT 控制報文包含三個部分:
組成部分 | 長度 | |
---|---|---|
固定報頭 | 2-5個位元組 | 存在於所有MQTT控制包 |
可變報頭 | 存在於某些MQTT控制包 | |
載荷 | 存在於某些MQTT控制包 |
- 我們藉助 MQTT 協議傳送的訊息內容儲存在載荷中
1.固定報頭
固定報頭由兩部分組成:控制包型別和剩餘長度
- 控制包型別目前有 14 種;
- 剩餘長度表示的是“可變報頭+載荷”的總長度;
如上圖所示,這是一條控制包型別為 CONNECT 的 MQTT 報文,固定報頭的中 剩餘長度用16進製表示為 0x1e,用10進製表示為 30。
從 1e 的後一個位元組 00 到末尾剛好是 30 個位元組。
1.1 剩餘長度與控制包最大長度256M
剩餘長度使用了一種可變長度的結構
用n個位元組表示剩餘長度 | 剩餘長度範圍起始值 | 剩餘長度範圍結束值 |
---|---|---|
1 | 0 (0x00) | 127 (0x7F) |
2 | 128 (0x80, 0x01) | 16 383 (0xFF, 0x7F) |
3 | 16 384 (0x80, 0x80, 0x01) | 2 097 151 (0xFF, 0xFF, 0x7F) |
4 | 2 097 152 (0x80, 0x80, 0x80, 0x01) | 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F) |
這將允許應用傳送可變報頭和載荷總長度為255M大小
換句話說,這將允許應用傳送最多256M大小的控制包。
2.可變報頭
以 CONNECT 報文的可變報頭為例,主要包含協議名稱(MQTT)和協議版本號(v3.1.1對應4);
2.1 MSB 和 LSB
至於 Length MSB(Most Significant Bit,最高有效位) 和 Length LSB (Last/Least Significant Bit,譯作最低有效位),
把 MSB 和 LSB 用大端位元組/網路位元組序來讀取,讀取的值可以表示協議名稱的長度。
以下是 Java 寫成的 Demo:
import java.io.*;
import java.util.Arrays;
public class Utf8Characters {
public static void main(String[] args) throws IOException {
// 模擬寫入
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(baos);
dataOut.writeUTF("MQTT");
byte[] bytes = baos.toByteArray();
System.out.println(Arrays.toString(bytes)); // 列印 [0, 4, 77, 81, 84, 84]
// 模擬讀取
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
DataInputStream dataIn = new DataInputStream(bais);
int len = dataIn.readUnsignedShort(); // 2 bytes
byte[] decodedString = new byte[len]; // 4 bytes
dataIn.read(decodedString);
String target = new String(decodedString, "UTF-8");
System.out.println(target); // 列印 MQTT
// 重置一下,重新讀取
dataIn.reset();
// 等同於
String result = dataIn.readUTF();
System.out.println(result); // 列印 MQTT
}
}
3. MQTT的特別之處
我們在學習TCP/IP協議的時候,就知道 ACK 這個概念,其實許多構建在TCP/IP協議之上的應用層協議也都會使用 XXXACK 包來表示已經成功接收 XXX 資訊。
MQTT也不能“免俗”:
- 連線報文 CONNECT 對應連線確認報文 CONNACK;
- 訂閱報文 SUBSCRIBE 對應訂閱確認報文 SUBACK;
- 取消訂閱報文 UNSUBSCRIBE 對應取消訂閱確認報文 UNSUBACK;
- 釋出報文 PUBLISH 對應釋出確認報文 PUBACK。
還有就是名字中沒有使用 ACK,但是實際上也是“一問一答”式的 PINGREQ 和 PINGRESP。但是 MQTT 的控制型別中還是有兩處“怪異之處”:
- 唯獨 DISCONNECT 沒有對應的確認報文;
- PUBLISH 除了有 PUBACK 之外,還有 PUBREC,PUBREL,PUBCOMP
3.1 遺言/遺囑Will
對於一般IoT裝置而言,就是一個大迴圈while不斷接收訊息,不存在正常退出的邏輯,一般都是斷電斷網導致的異常退出。DISCONNECT 並不常用,也不用確認。
但是,如果客戶端正常發出了 DISCONNECT 報文,那麼服務端收到 DISCONNECT 後必須丟棄所有和當前連線有關的Will Message,不釋出。
我們通常都有判斷IoT裝置是否線上的需求,使用遺言機制就很好實現。
- 遺言/遺囑是CONNECT型別報文中,伴隨客戶端連線服務端的請求一併發出的;
- 可變報頭中包含 Will Flag,Will QoS,Will Retain;其中 QoS 和 Retain 效果同 publish 報文中的 QoS 和 Retain;
- 如果可變報頭連線標識位 Will Flag 等於1,那麼載荷中將包含 Will Topic 和 Will Message 欄位;
※ 模擬遺囑傳送和接收:
由於 mqtt-spy.jar 的無論是點選x
關閉,還是殺死程序,都是正常的Disconnect退出,所以只好寫一個 Java 客戶端來模擬異常退出的場景。
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.UUID;
public class Main {
public static void main(String[] args) throws MqttException {
String clientId = Arrays.stream(args).findFirst().orElse(UUID.randomUUID().toString());
MqttClientPersistence persistence = new MemoryPersistence();
MqttClient client = new MqttClient("tcp://localhost:1883", clientId, persistence);
MqttConnectOptions options = new MqttConnectOptions();
// 2 表示 EXACTLY_ONCE
options.setWill("DeviceStatus", ("{\"device\":\""+ clientId + "\",\"state\":\"offline\"}").getBytes(StandardCharsets.UTF_8),
2, false);
client.connect(options);
}
}
執行這個程式,然後再用 mqtt-spy.jar
模擬一個WEB伺服器上的MQTT客戶端:
Connections -> New Connection 開啟如下圖所示的頁面,輸入Client ID為web,其他都預設,然後 點選Open Connection:
在 Subscriptions and received messages 這一欄點選 New,彈出如下圖所示對話方塊,輸入主題DeviceStatus,然後點選 Subscribe:
然後,我們就可以去關閉 Java 的 MQTT客戶端了。接著,就收到了遺言:
然後,我還找到了 WireShark 抓取的Java的MQTT客戶端發出的Connect報文:
3.1.1 WILL MESSAGE長度限制65535個位元組
從理論上來說,MQTT 中的字串符合以下形式:
用兩個位元組表示內容長度,因此內容長度可以是0到65535個位元組。
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.UUID;
import java.util.stream.IntStream;
public class Main {
public static void main(String[] args) throws MqttException, IOException {
String clientId = Arrays.stream(args).findFirst().orElse(UUID.randomUUID().toString());
MqttClientPersistence persistence = new MemoryPersistence();
MqttClient client = new MqttClient("tcp://localhost:1883", clientId, persistence);
MqttConnectOptions options = new MqttConnectOptions();
StringBuilder sb = new StringBuilder();
IntStream.range(0, 65536).forEach(i -> {
int result = i % 10;
sb.append(result);
});
// 2 表示 EXACTLY_ONCE
byte[] payload = sb.toString().getBytes(StandardCharsets.UTF_8);
options.setWill("DeviceStatus", payload,
2, false);
client.connect(options);
}
}
如上面這段程式碼模擬了 Will Message 為 65536 個位元組,伺服器直接斷開了客戶端的連線:
從圖中可以看出,Will Message 超長了,導致長度為0。具體可以看這段程式碼:
org.eclipse.paho.client.mqttv3.internal.wire.MqttConnect#getPayload
if (willMessage != null) {
encodeUTF8(dos, willDestination);
dos.writeShort(willMessage.getPayload().length); // 這段程式碼再跟進去看,(v >>> 8) & 0xFF 計算等於 0,(v >>> 0) & 0xFF 計算也等於 0
dos.write(willMessage.getPayload());
}
3.1.2 Will Retain只保持最新一條
RETAIN(保持)
1:表示傳送的訊息需要一直持久儲存(不受伺服器重啟影響),不但要傳送給當前的訂閱者,並且以後新來的訂閱了此Topic name的訂閱者會馬上得到推送。
備註:新來乍到的訂閱者,只會取出最新的一個RETAIN flag = 1的訊息推送。
※ 實驗如下:
修改 3.1 中 Main 的程式碼:
// retain 由 false 改為 true
options.setWill("DeviceStatus", ("{\"device\":\""+ clientId + "\",\"state\":\"offline\"}").getBytes(StandardCharsets.UTF_8),
2, true);
然後,在 Run Configuration 中拷貝三份 Main,並分別命名為 iot_1,iot_2,iot_3,並且 Program arguments 也分別為 iot_1,iot_2,iot_3:
分別執行 iot_1,iot_2,iot_3,然後再依次結束他們。
然後,再啟動 mqtt-spy.jar,並訂閱主題 DeviceStatus:
如圖所示,我們觀察到新訂閱者只獲取到主題中的最新一條訊息!
3.2 QoS
這個又有很多內容,還是另開一篇 閱讀。
參考文件
MQTT協議筆記之頭部資訊 閱讀
這篇文章主要解答了我對 Length MSB 和 Length LSB 的疑惑
Java MQTT 客戶端之 Paho 閱讀
如果你對 Java 實現 MQTT 客戶端感興趣,可以讀一下這篇