1. 程式人生 > 其它 >MQTT3.1.1協議閱讀筆記1

MQTT3.1.1協議閱讀筆記1

本文主要是記錄閱讀 MQTT3.1.1協議中文版 時的心得感悟。

環境資訊

  1. 使用Docker執行emqx,作為MQTT的服務端
  2. 使用mqtt-spy.jar作為MQTT的客戶端
  3. 使用Paho寫一個簡單的Java-MQTT客戶端
  4. 使用WireShark進行協議抓包

MQTT 簡介

MQTT(Message Queuing Telemetry Transport,訊息佇列遙測傳輸協議),是一種基於釋出/訂閱(publish/subscribe)模式的"輕量級"通訊協議,該協議構建於 TCP/IP 協議之上,由IBM在1999年釋出。。

一個 MQTT 控制報文包含三個部分:

組成部分 長度
固定報頭 2-5個位元組 存在於所有MQTT控制包
可變報頭 存在於某些MQTT控制包
載荷 存在於某些MQTT控制包
  • 我們藉助 MQTT 協議傳送的訊息內容儲存在載荷中

1.固定報頭

固定報頭由兩部分組成:控制包型別和剩餘長度

  • 控制包型別目前有 14 種;
  • 剩餘長度表示的是“可變報頭+載荷”的總長度

如上圖所示,這是一條控制包型別為 CONNECT 的 MQTT 報文,固定報頭的中 剩餘長度用16進製表示為 0x1e,用10進製表示為 30。

從 1e 的後一個位元組 00 到末尾剛好是 30 個位元組。

1.1 剩餘長度與控制包最大長度256M

剩餘長度使用了一種可變長度的結構

來編碼,這種結構使用單一位元組表示0-127的值。大於127的值如下處理。每個位元組的低7位用來編碼資料,最高位用來表示是否還有後續位元組。剩餘長度最多可以用四個位元組來表示。

用n個位元組表示剩餘長度 剩餘長度範圍起始值 剩餘長度範圍結束值
1 0 (0x00) 127 (0x7F)
2 128 (0x80, 0x01) 16 383 (0xFF, 0x7F)
3 16 384 (0x80, 0x80, 0x01) 2 097 151 (0xFF, 0xFF, 0x7F)
4 2 097 152 (0x80, 0x80, 0x80, 0x01) 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)

這將允許應用傳送可變報頭和載荷總長度為255M大小

的控制包。這個數字用16進製表示為:0xFF,0xFF,0xFF,0x7F。

換句話說,這將允許應用傳送最多256M大小的控制包。

2.可變報頭

以 CONNECT 報文的可變報頭為例,主要包含協議名稱(MQTT)和協議版本號(v3.1.1對應4);

2.1 MSB 和 LSB

至於 Length MSB(Most Significant Bit,最高有效位) 和 Length LSB (Last/Least Significant Bit,譯作最低有效位),

把 MSB 和 LSB 用大端位元組/網路位元組序來讀取,讀取的值可以表示協議名稱的長度。

以下是 Java 寫成的 Demo:

import java.io.*;
import java.util.Arrays;

public class Utf8Characters {

    public static void main(String[] args) throws IOException {
        // 模擬寫入
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dataOut = new DataOutputStream(baos);

        dataOut.writeUTF("MQTT");

        byte[] bytes = baos.toByteArray();
        System.out.println(Arrays.toString(bytes)); // 列印 [0, 4, 77, 81, 84, 84]

        // 模擬讀取
        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
        DataInputStream dataIn = new DataInputStream(bais);

        int len = dataIn.readUnsignedShort(); // 2 bytes
        byte[] decodedString = new byte[len]; // 4 bytes
        dataIn.read(decodedString);
        String target = new String(decodedString, "UTF-8");
        System.out.println(target); // 列印 MQTT

        // 重置一下,重新讀取
        dataIn.reset();
        // 等同於
        String result = dataIn.readUTF();
        System.out.println(result); // 列印 MQTT

    }
}

3. MQTT的特別之處

我們在學習TCP/IP協議的時候,就知道 ACK 這個概念,其實許多構建在TCP/IP協議之上的應用層協議也都會使用 XXXACK 包來表示已經成功接收 XXX 資訊。
MQTT也不能“免俗”:

  • 連線報文 CONNECT 對應連線確認報文 CONNACK;
  • 訂閱報文 SUBSCRIBE 對應訂閱確認報文 SUBACK;
  • 取消訂閱報文 UNSUBSCRIBE 對應取消訂閱確認報文 UNSUBACK;
  • 釋出報文 PUBLISH 對應釋出確認報文 PUBACK。

還有就是名字中沒有使用 ACK,但是實際上也是“一問一答”式的 PINGREQ 和 PINGRESP。但是 MQTT 的控制型別中還是有兩處“怪異之處”

  • 唯獨 DISCONNECT 沒有對應的確認報文;
  • PUBLISH 除了有 PUBACK 之外,還有 PUBREC,PUBREL,PUBCOMP

3.1 遺言/遺囑Will

對於一般IoT裝置而言,就是一個大迴圈while不斷接收訊息,不存在正常退出的邏輯,一般都是斷電斷網導致的異常退出。DISCONNECT 並不常用,也不用確認。

但是,如果客戶端正常發出了 DISCONNECT 報文,那麼服務端收到 DISCONNECT 後必須丟棄所有和當前連線有關的Will Message,不釋出。

我們通常都有判斷IoT裝置是否線上的需求,使用遺言機制就很好實現。

  • 遺言/遺囑是CONNECT型別報文中,伴隨客戶端連線服務端的請求一併發出的;
  • 可變報頭中包含 Will Flag,Will QoS,Will Retain;其中 QoS 和 Retain 效果同 publish 報文中的 QoS 和 Retain;
  • 如果可變報頭連線標識位 Will Flag 等於1,那麼載荷中將包含 Will Topic 和 Will Message 欄位;

模擬遺囑傳送和接收

由於 mqtt-spy.jar 的無論是點選x關閉,還是殺死程序,都是正常的Disconnect退出,所以只好寫一個 Java 客戶端來模擬異常退出的場景。

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.UUID;

public class Main {

    public static void main(String[] args) throws MqttException {
        String clientId = Arrays.stream(args).findFirst().orElse(UUID.randomUUID().toString());
        MqttClientPersistence persistence = new MemoryPersistence();
        MqttClient client = new MqttClient("tcp://localhost:1883", clientId, persistence);

        MqttConnectOptions options = new MqttConnectOptions();
        // 2 表示 EXACTLY_ONCE
        options.setWill("DeviceStatus", ("{\"device\":\""+ clientId + "\",\"state\":\"offline\"}").getBytes(StandardCharsets.UTF_8),
                2, false);
        client.connect(options);
    }
}

執行這個程式,然後再用 mqtt-spy.jar 模擬一個WEB伺服器上的MQTT客戶端:

Connections -> New Connection 開啟如下圖所示的頁面,輸入Client ID為web,其他都預設,然後 點選Open Connection

在 Subscriptions and received messages 這一欄點選 New,彈出如下圖所示對話方塊,輸入主題DeviceStatus,然後點選 Subscribe

然後,我們就可以去關閉 Java 的 MQTT客戶端了。接著,就收到了遺言:

然後,我還找到了 WireShark 抓取的Java的MQTT客戶端發出的Connect報文:

3.1.1 WILL MESSAGE長度限制65535個位元組

從理論上來說,MQTT 中的字串符合以下形式:

用兩個位元組表示內容長度,因此內容長度可以是0到65535個位元組。

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.UUID;
import java.util.stream.IntStream;

public class Main {

    public static void main(String[] args) throws MqttException, IOException {
        String clientId = Arrays.stream(args).findFirst().orElse(UUID.randomUUID().toString());
        MqttClientPersistence persistence = new MemoryPersistence();
        MqttClient client = new MqttClient("tcp://localhost:1883", clientId, persistence);

        MqttConnectOptions options = new MqttConnectOptions();
        StringBuilder sb = new StringBuilder();
        IntStream.range(0, 65536).forEach(i -> {
                int result = i % 10;
                sb.append(result);
        });
        // 2 表示 EXACTLY_ONCE
        byte[] payload = sb.toString().getBytes(StandardCharsets.UTF_8);
        options.setWill("DeviceStatus", payload,
                2, false);
        client.connect(options);
    }
}

如上面這段程式碼模擬了 Will Message 為 65536 個位元組,伺服器直接斷開了客戶端的連線:

從圖中可以看出,Will Message 超長了,導致長度為0。具體可以看這段程式碼:org.eclipse.paho.client.mqttv3.internal.wire.MqttConnect#getPayload

if (willMessage != null) {
  encodeUTF8(dos, willDestination);
  dos.writeShort(willMessage.getPayload().length); // 這段程式碼再跟進去看,(v >>> 8) & 0xFF 計算等於 0,(v >>> 0) & 0xFF 計算也等於 0
  dos.write(willMessage.getPayload());
}

3.1.2 Will Retain只保持最新一條

RETAIN(保持)
1:表示傳送的訊息需要一直持久儲存(不受伺服器重啟影響),不但要傳送給當前的訂閱者,並且以後新來的訂閱了此Topic name的訂閱者會馬上得到推送。
備註:新來乍到的訂閱者,只會取出最新的一個RETAIN flag = 1的訊息推送。

※ 實驗如下:
修改 3.1 中 Main 的程式碼:

// retain 由 false 改為 true
options.setWill("DeviceStatus", ("{\"device\":\""+ clientId + "\",\"state\":\"offline\"}").getBytes(StandardCharsets.UTF_8),
                2, true);

然後,在 Run Configuration 中拷貝三份 Main,並分別命名為 iot_1,iot_2,iot_3,並且 Program arguments 也分別為 iot_1,iot_2,iot_3:

分別執行 iot_1,iot_2,iot_3,然後再依次結束他們。

然後,再啟動 mqtt-spy.jar,並訂閱主題 DeviceStatus:

如圖所示,我們觀察到新訂閱者只獲取到主題中的最新一條訊息!

3.2 QoS

這個又有很多內容,還是另開一篇 閱讀

參考文件

MQTT協議筆記之頭部資訊 閱讀

這篇文章主要解答了我對 Length MSB 和 Length LSB 的疑惑

Java MQTT 客戶端之 Paho 閱讀

如果你對 Java 實現 MQTT 客戶端感興趣,可以讀一下這篇