1. 程式人生 > 實用技巧 >301-STM32+BC26基本控制篇-重點詳解-MQTT協議

301-STM32+BC26基本控制篇-重點詳解-MQTT協議

<p><iframe name="ifd" src="https://mnifdv.cn/resource/cnblogs/ZLBC26AA/" frameborder="0" scrolling="auto" width="100%" height="1500"></iframe></p>

先來體驗一下MQTT通訊

1.提示:

可以把MQTT軟體安裝到自己的電腦,也可以安裝在雲伺服器上

如果把MQTT伺服器安裝在自己的電腦上,連線伺服器的IP地址就是自己電腦的IP地址

如果安裝到雲伺服器上,連線伺服器的IP地址就是雲伺服器的IP地址.

2.開啟除錯助手

3.需要開啟兩個,預設連線提供的伺服器測試.

第一個配置如下:
釋出的主題:aaaaa
訂閱的主題:Topic
點選連線,然後點選訂閱

第二個配置如下:
釋出的主題:Topic
訂閱的主題:aaaaa
點選連線,然後點選訂閱

4.第一個軟體發訊息:傳送的訊息123456,然後點擊發送


使用者會看到第二個軟體收到訊息
提示:這個軟體是自己開發的,裡面的顯示都是自己規定的.



其實過程是這樣的: 兩個客戶端都連線了一個MQTT伺服器(這個只是個軟體,後面章節會告訴大家怎麼安裝) 第一個客戶端釋出的主題那一欄填寫的是 aaaaa然後傳送的訊息是 123456 點擊發送的時候實際上該訊息就發給了MQTT伺服器
整個訊息格式呢大概是這樣的 XXXXaaaaaXXXX123456 XXXX呢代表其它資訊,方便伺服器區分出來整個訊息中的 釋出的主題(aaaaa)和釋出的訊息(123456) 其實 aaaaa 就充當了這個訊息的標識

第二個客戶端的訂閱那一項填寫的是 aaaaa 其實就是在告訴伺服器,我需要資料標識是 aaaaa的訊息 既然你告訴了伺服器了,那麼伺服器只要接收到資料標識是 aaaaa 的訊息,那麼就會主動把訊息發給你



5.同理,讓下面的客戶端把訊息發給上面的客戶端


6.簡要說明:

連線上MQTT伺服器以後,只要是兩個裝置之間訂閱和釋出的主題對上了 那麼這兩個裝置就可以通訊了 對於初學者可能疑惑,你軟體點點點到底內部是怎麼做到的
如果你想知道更多就接著看,我先說明一下過程.

其實MQTT就是一個TCP伺服器,它是在TCP通訊的時候封裝了一套協議. 咱們就叫它MQTT協議,注意本質上就是TCP傳輸資料,這個資料有格式而已! 首先是使用TCP連線,然後傳送MQTT連線協議,然後傳送MQTT訂閱主題的協議. 這樣的話,伺服器就知道你需要哪種標識的資料了. 當伺服器收到這種標識的資料的時候,伺服器就會主動轉發給你. 其實MQTT伺服器主要工作就是做資料轉發,但是你需要告訴它你需要什麼樣的資料.

思考

1.其實理解一個東西最好的方式就是:你要設想如果讓你自己做一個這樣的伺服器,你會怎麼做.

2.現在需求是做一個負責資料轉發的軟體

首先,平時的時候咱做的TCP伺服器都是,一個或者多個客戶端連線咱做的TCP伺服器,然後TCP伺服器處理客戶端的資料. 現在呢!需求變了! 假設我有5個網路裝置,3個手機.我現在想讓網路裝置把資料遠端傳給手機.而且我還需要記錄網路裝置上傳的資料. 假設通訊是這樣的(而且後期還會不停的增加裝置和手機)

3.咋辦??? 1. 需要記錄所有裝置的資料 2. 裝置和手機之間存在多對一和一對多,所以,必須需要個公共的伺服器進行資料的中轉. 假設你就把這個伺服器做成TCP伺服器,有人問,你咋不做成UDP呢? UDP是無連線狀態,傳送資料不好判斷是不是傳送成功,我還是少找些麻煩! 還有就是要實現遠端,有個公網IP就可以,可以自己買個伺服器,上網路公司拉一根專網 或者用自己電腦,用花生殼對映 還是用雲伺服器吧!就是執行在別人的伺服器上的一臺電腦(就是一臺電腦),IP地址直接是公網.方便. 4.怎麼設計這個TCP伺服器??? 1.為了應對這種通訊,首先裝置傳送的資料決不能是單單的資料,必須加點東西 2.如果把傳送的資料帶上標識呢? 假設裝置1傳送的資料是: aaaaa資料 (aaaaa是資料標識,後面是真實資料) 3.然後呢!假設手機1就接收資料標識是aaaaa的資料,怎麼讓伺服器轉發給它呢??? 4.如果手機1在連線上TCP伺服器的時候 告訴TCP伺服器我接收資料標識是 aaaaa的資料 5.通過上面的方式是不是有點眉頭了???? 咱呢姑且把 "告訴TCP伺服器我接收資料標識是 aaaaa的資料"這個事情呢,起個名字 訂閱的主題是 aaaaa 把 "假設裝置1傳送的資料是:aaaaa資料 " 訊息前面的 aaaaa 叫做 釋出的主題是aaaaa 5.總結上面的就是 手機1先連線TCP伺服器,然後呢,規定個協議,告訴TCP伺服器我訂閱的主題是aaaaa 這樣呢伺服器就記住了,當出現訊息前面的主題是aaaaa的訊息的時候,他就把這個訊息發給手機1 當然咱假設,裝置1連線上TCP伺服器,然後,告訴TCP伺服器我訂閱的主題是wwww 這樣呢伺服器就記住了,當出現訊息前面的主題是wwww的訊息的時候,他就把這個訊息發給裝置1 然後裝置1連線上TCP伺服器以後呢,這樣傳送資訊(假設傳送的訊息是123456): aaaaa123456 伺服器一接收到客戶端的訊息,就取出來這個訊息的標識是什麼,取出來的是 aaaaa 然後呢,看下記錄的誰需要訊息標識是aaaaa的訊息,然後找到了手機1 最後把這個訊息傳送給手機1這個客戶端,然後手機1就接收到了1123456這個訊息 同理:手機1傳送wwww998877然後這個訊息就會發給裝置1 ,裝置1就會收到 998877 6.總結 這個伺服器道理上是這樣,伺服器記錄各個裝置的資訊,各個裝置訂閱的主題,然後呢,判斷這個訊息然後進行轉發 但是...咱做個簡單的完全可以做出來,但是要想做的完善,而且要支援龐大訊息數量的裝置(來個百萬級).....不是一朝一夕就可以的. 其實很長時間以前,人們就有這種需求了.多對一和一對多通訊 所以呢,一些組織和單位就開始解決這種問題,開始做這種軟體,所以MQTT就誕生了. 之所以叫MQTT是因為是外國人做的這種TCP伺服器,外國人呢,為實現這種功能的TCP伺服器取了個名字叫 Message Queuing Telemetry Transport 然後取每個首字母就叫 MQTT了 其實有很多家做MQTT軟體,但是呢,我比較喜歡用emqtt

來說一下具體的MQTT協議

1,首先咱知道就是個TCP伺服器,所以呢,需要先用TCP連線上他們的伺服器. 2,咱用Android ,C#,QT,網頁等等連線MQTT伺服器的時候有現成的封裝好的庫可以用,其實說白了就是呼叫函式而已..... 3,但是對於微控制器而言要想實現MQTT通訊,那麼就需要藉助網路模組 大部分的網路模組都可以實現TCP通訊,咱呢,就需要在TCP的基礎上按照MQTT協議封裝下咱的資料 注:其實官方給了現成的MQTT的封裝資料和解析資料的程式) https://docs.emqx.io/sdk_tools?category=MQTT_Clients (官方提供的各個開發的庫) 微控制器用下面這個,不過我以前用的這個,因為庫功能很全,佔用記憶體有點大,所以後期使用的是自己重新封裝的.
下面是自己當前使用的mqtt最底層
/**
  ******************************************************************************
  * @author  yang feng wu 
  * @version V1.0.0
  * @date    2019/12/15
  * @brief   
  ******************************************************************************
    
  ******************************************************************************
  */

#define MQTTCLIENT_C_//如果沒有定義

#include "mqtt_msg.h"
#include "string.h"
#include "stm32f10x.h"


#define MQTT_MAX_FIXED_HEADER_SIZE 3

uint16_t mqtt_message_id = 0;

enum mqtt_connect_flag
{
  MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
  MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
  MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
  MQTT_CONNECT_FLAG_WILL = 1 << 2,
  MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
};
//__attribute((__packed__))
struct  mqtt_connect_variable_header
{
  uint8_t lengthMsb;
  uint8_t lengthLsb;
  uint8_t magic[4];
  uint8_t version;
  uint8_t flags;
  uint8_t keepaliveMsb;
  uint8_t keepaliveLsb;
};


int mqtt_get_type(unsigned char* buffer) { return (buffer[0] & 0xf0) >> 4; }
int mqtt_get_connect_ret_code(unsigned char* buffer) { return (buffer[3]); }
int mqtt_get_qos(unsigned char* buffer) { return (buffer[0] & 0x06) >> 1; }


int append_string(int *length,unsigned  char* buffer,int buffer_length,unsigned  char* string, int len)
{
  if((*length) + len + 2 > buffer_length)//加上 ClientID 和 記錄 ClientID個數(兩位) 以後超出了陣列
    return -1;

  buffer[(*length)++] = len >> 8;
  buffer[(*length)++] = len & 0xff;
  c_memcpy(buffer + (*length), string, len);
  (*length) += len;
  return len + 2;
}



uint16_t append_message_id(int *length,unsigned  char* buffer,int buffer_length, uint16_t message_id)
{
  // If message_id is zero then we should assign one, otherwise
  // we'll use the one supplied by the caller
  while(message_id == 0)
    message_id = ++mqtt_message_id;

  if((*length) + 2 > buffer_length)
    return 0;

  buffer[(*length)++] = message_id >> 8;
  buffer[(*length)++] = message_id & 0xff;
    
  return message_id;
}


int fini_message(unsigned char **data_ptr,int    length,unsigned char* buffer, int type, int dup, int qos, int retain)
{
  int remaining_length = length - MQTT_MAX_FIXED_HEADER_SIZE;
    
  if(remaining_length > 127)
  {
    buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
    buffer[1] = 0x80 | (remaining_length % 128);
    buffer[2] = remaining_length / 128;
    length = remaining_length + 3;
    *data_ptr = buffer;
  }
  else
  {
    buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
    buffer[2] = remaining_length;
    length = remaining_length + 2;
    *data_ptr = buffer + 1;
  }

  return length;
}



uint16_t mqtt_get_id(unsigned char* buffer, uint16_t length)
{
  if(length < 1)
    return 0;
    
  switch(mqtt_get_type(buffer))
  {
    case MQTT_MSG_TYPE_PUBLISH:
    {
      int i;
      int topiclen;

      for(i = 1; i < length; ++i)
      {
        if((buffer[i] & 0x80) == 0)
        {
          ++i;
          break;
        }
      }

      if(i + 2 >= length)
        return 0;
      topiclen = buffer[i++] << 8;
      topiclen |= buffer[i++];

      if(i + topiclen >= length)
        return 0;
      i += topiclen;

      if(mqtt_get_qos(buffer) > 0)
      {
        if(i + 2 >= length)
          return 0;
        //i += 2;
      } else {
          return 0;
      }
      return (buffer[i] << 8) | buffer[i + 1];
    }
    case MQTT_MSG_TYPE_PUBACK:
    case MQTT_MSG_TYPE_PUBREC:
    case MQTT_MSG_TYPE_PUBREL:
    case MQTT_MSG_TYPE_PUBCOMP:
    case MQTT_MSG_TYPE_SUBACK:
    case MQTT_MSG_TYPE_UNSUBACK:
    case MQTT_MSG_TYPE_SUBSCRIBE:
    {
      // This requires the remaining length to be encoded in 1 byte,
      // which it should be.
      if(length >= 4 && (buffer[1] & 0x80) == 0)
        return (buffer[2] << 8) | buffer[3];
      else
        return 0;
    }

    default:
      return 0;
  }
}



/**
* @brief   獲取MQTT返回的資料長度(去掉1和2位元組後面資料的長度)
* @param   buffer   MQTT返回的資料首地址
* @param   length   返回的資料個數
* @retval  資料長度 
* @warning None
* @example 
**/
int mqtt_get_total_length(unsigned char* buffer, uint16_t length)
{
  int i;
  int totlen = 0;

  for(i = 1; i < length; ++i)
  {
    totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
    if((buffer[i] & 0x80) == 0)
    {
      ++i;
      break;
    }
  }
  totlen += i;

  return totlen;
}




/**
* @brief   打包連線MQTT指令
* @param   info     MQTT資訊
* @param   data_ptr 打包的資料首地址
* @param   buffer   打包進的陣列
* @param   buffer_length 陣列長度
* @retval  資料長度 
* @warning None
* @example 
**/
int mqtt_msg_connect(mqtt_connect_info_t* info,unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
  struct mqtt_connect_variable_header* variable_header;
    
    mqtt_message_id = 0;
    
    length = MQTT_MAX_FIXED_HEADER_SIZE;//頭.連線型別1位,資料個數2位(如果大於127就需要兩位)
    
  if(length + sizeof(*variable_header) > buffer_length)//陣列不夠儲存的
    return 0;
    
  variable_header = (void*)(buffer + length);//把陣列分給這個結構體裡面的變數
  length += sizeof(*variable_header);//儲存完 連線型別,整個資料個數,版本號個數,版本號,等
    
  variable_header->lengthMsb = 0;//版本名稱個數高位
  variable_header->lengthLsb = 4;//版本名稱個數低位
  c_memcpy(variable_header->magic, "MQTT", 4);//版本名稱MQTT
  variable_header->version = 4;//版本號
  variable_header->flags = 0;//先清零
  variable_header->keepaliveMsb = info->keepalive >> 8;//心跳包時間
  variable_header->keepaliveLsb = info->keepalive & 0xff;//心跳包時間

  if(info->clean_session)//清除連線資訊
    variable_header->flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;

  if(info->client_id != NULL && info->client_id[0] != '\0')//client_id
  {
    if(append_string(&length,buffer,buffer_length, info->client_id, c_strlen(info->client_id)) < 0)//拷貝
      return -1;//陣列不夠用呀...
  }
  else
    return -2;//沒有設定client_id

  if(info->will_topic != NULL && info->will_topic[0] != '\0')//遺囑
  {
    if(append_string(&length,buffer,buffer_length , info->will_topic, c_strlen(info->will_topic)) < 0)//遺囑的主題
      return -3;

    if(append_string(&length,buffer,buffer_length , info->will_message, c_strlen(info->will_message)) < 0)//遺囑的訊息
      return -4;

    variable_header->flags |= MQTT_CONNECT_FLAG_WILL;//需要遺囑
    if(info->will_retain)//遺囑是夠需要伺服器保留
      variable_header->flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;//保留遺囑
    variable_header->flags |= (info->will_qos & 3) << 3;//遺囑訊息等級
  }
    
  if(info->username != NULL && info->username[0] != '\0')//username
  {
    if(append_string(&length,buffer,buffer_length, info->username, c_strlen(info->username)) < 0)//拷貝使用者名稱
      return -5;

    variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME;//有使用者名稱
  }
    
  if(info->password != NULL && info->password[0] != '\0')//password
  {
    if(append_string(&length,buffer,buffer_length, info->password, c_strlen(info->password)) < 0)
      return -6;

    variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD;//有密碼
  }

  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_CONNECT, 0, 0, 0);//最終組合連線MQTT的指令
}


/**
* @brief  判斷是否連線上MQTT
* @param  伺服器返回的資料
* @param  
* @retval 0 連線成功
* @example 
**/
int  mqtt_msg_connect_ack(unsigned char *buff)
{
    if(mqtt_get_type(buff) == MQTT_MSG_TYPE_CONNACK)
    {
        return mqtt_get_connect_ret_code(buff);
    }
    return -1;
}


/**
* @brief   斷開連線
* @param   data_ptr 打包的資料首地址
* @param   buffer   打包進的陣列
* @param   buffer_length 陣列長度
* @retval  資料長度 
* @warning None
* @example 
**/
int mqtt_msg_disconnect(unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
    length = MQTT_MAX_FIXED_HEADER_SIZE;
  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
}



/**
* @brief   訂閱主題
* @param   topic   訂閱的主題
* @param   qos     訊息等級
* @param   data_ptr 打包的資料首地址
* @param   buffer   打包進的陣列
* @param   buffer_length 陣列長度
* @retval  資料長度 
* @warning None
* @example 
**/
int mqtt_msg_subscribe_topic(unsigned char* topic, int qos,unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
    length = MQTT_MAX_FIXED_HEADER_SIZE;
    
    if(topic == NULL || topic[0] == '\0')
        return -1;
    
    if((mqtt_message_id = append_message_id(&length, buffer, buffer_length, 0)) == 0)
        return -2;
    
    if(append_string(&length, buffer, buffer_length, topic, c_strlen(topic)) < 0)
        return -3;
    
    if(length + 1 > buffer_length)
    return -4;
  buffer[length++] = qos;
    
    return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
}



/**
* @brief  判斷是否成功訂閱
* @param  buffer  伺服器返回的資料
* @param  length  伺服器返回的資料長度
* @retval 0:成功  1:失敗
* @example 
**/
int mqtt_msg_subscribe_ack(unsigned char* buffer, uint16_t length)
{
    if(mqtt_get_type(buffer) == MQTT_MSG_TYPE_SUBACK)
    {
        if(mqtt_get_id(buffer,length) == mqtt_message_id)
        {
            return 0;
        }
        else
        {
            return 1;
        }
    }
    else
    {
        return 1;
    }
}


/**
* @brief   釋出訊息
* @param   topic    主題
* @param   data     訊息
* @param   data_length 訊息長度
* @param   qos      訊息等級
* @param   retain   是否需要保留訊息
* @param   data_ptr 打包的資料首地址
* @param   buffer   打包進的陣列
* @param   buffer_length 陣列長度
* @retval  資料長度 
* @warning None
* @example 
**/
int mqtt_msg_publish(unsigned char* topic,unsigned  char* date, int data_length, int qos, int retain,unsigned  char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
    length = MQTT_MAX_FIXED_HEADER_SIZE;

  if(topic == NULL || topic[0] == '\0')
    return -1;

  if(append_string(&length, buffer, buffer_length, topic, strlen(topic)) < 0)
    return -2;

  if(qos > 0)
  {
    if((mqtt_message_id = append_message_id(&length, buffer, buffer_length,  0)) == 0)
      return -3;
  }
  else
    mqtt_message_id = 0;

  if(length + data_length > buffer_length)
    return -4;
  memcpy(buffer + length, date, data_length);
  length += data_length;

  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}



int mqtt_msg_puback(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
  length = MQTT_MAX_FIXED_HEADER_SIZE;
  if(append_message_id(&length, buffer, buffer_length,message_id) == 0)
    return -1;
  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBACK, 0, 0, 0);
}


int mqtt_msg_pubrec(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
  length = MQTT_MAX_FIXED_HEADER_SIZE;
  if(append_message_id(&length, buffer, buffer_length,message_id) == 0)
    return -1;
  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBREC, 0, 0, 0);
}


int mqtt_msg_pubrel(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
  length = MQTT_MAX_FIXED_HEADER_SIZE;
    
  if(append_message_id(&length, buffer, buffer_length,message_id) == 0)
    return -1;
  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBREL, 0, 1, 0);
}


int mqtt_msg_pubcomp(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
  length = MQTT_MAX_FIXED_HEADER_SIZE;
  if(append_message_id(&length, buffer, buffer_length,message_id) == 0)
    return -1;
  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
}


const char* mqtt_get_publish_topic(unsigned char* buffer, uint16_t* length)
{
  int i;
  int totlen = 0;
  int topiclen;

  for(i = 1; i < *length; ++i)
  {
    totlen += (buffer[i] & 0x7f) << (7 * (i -1));
    if((buffer[i] & 0x80) == 0)
    {
      ++i;
      break;
    }
  }
  totlen += i;

  if(i + 2 >= *length)
    return NULL;
  topiclen = buffer[i++] << 8;
  topiclen |= buffer[i++];

  if(i + topiclen > *length)
    return NULL;

  *length = topiclen;
  return (const char*)(buffer + i);
}


const char* mqtt_get_publish_data(unsigned char* buffer, uint16_t* length)
{
  int i;
  int totlen = 0;
  int topiclen;
  int blength = *length;
  *length = 0;

  for(i = 1; i < blength; ++i)
  {
    totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
    if((buffer[i] & 0x80) == 0)
    {
      ++i;
      break;
    }
  }
  totlen += i;

  if(i + 2 >= blength)
    return NULL;
  topiclen = buffer[i++] << 8;
  topiclen |= buffer[i++];

  if(i + topiclen >= blength)
    return NULL;

  i += topiclen;

  if(mqtt_get_qos(buffer) > 0)
  {
    if(i + 2 >= blength)
      return NULL;
    i += 2;
  }

  if(totlen < i)
    return NULL;

  if(totlen <= blength)
    *length = totlen - i;
  else
    *length = blength - i;
  return (const char*)(buffer + i);
}

/**
* @brief   打包伺服器返回的心跳包資料(用不到)
* @param   data_ptr 打包的資料首地址
* @param   buffer   打包進的陣列
* @param   buffer_length 陣列長度
* @retval  資料長度 
* @warning None
* @example 
**/
int mqtt_msg_pingresp(unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
    length = MQTT_MAX_FIXED_HEADER_SIZE;    
  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0);
}

/**
* @brief   獲取傳送給伺服器的心跳包資料
* @param   data_ptr 打包的資料首地址
* @param   buffer   打包進的陣列
* @param   buffer_length 陣列長度
* @retval  資料長度 
* @warning None
* @example 
**/
int mqtt_msg_pingreq(unsigned char **data_ptr,unsigned char* buffer,int buffer_length)
{
    int length;
    length = MQTT_MAX_FIXED_HEADER_SIZE;    
  return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0);
}

#ifndef MQTTCLIENT_H_
#define MQTTCLIENT_H_


#ifndef MQTTCLIENT_C_//如果沒有定義
#define MQTTCLIENT_Cx_ extern
#else
#define MQTTCLIENT_Cx_
#endif

#include "string.h"
#include "stm32f10x.h"

#define c_memcpy memcpy
#define c_memset memset
#define c_strlen strlen


enum mqtt_message_type
{
  MQTT_MSG_TYPE_CONNECT = 1,
  MQTT_MSG_TYPE_CONNACK = 2,
  MQTT_MSG_TYPE_PUBLISH = 3,
  MQTT_MSG_TYPE_PUBACK = 4,
  MQTT_MSG_TYPE_PUBREC = 5,
  MQTT_MSG_TYPE_PUBREL = 6,
  MQTT_MSG_TYPE_PUBCOMP = 7,
  MQTT_MSG_TYPE_SUBSCRIBE = 8,
  MQTT_MSG_TYPE_SUBACK = 9,
  MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
  MQTT_MSG_TYPE_UNSUBACK = 11,
  MQTT_MSG_TYPE_PINGREQ = 12,
  MQTT_MSG_TYPE_PINGRESP = 13,
  MQTT_MSG_TYPE_DISCONNECT = 14
};

enum mqtt_connack_return_code
{
    MQTT_CONN_FAIL_SERVER_NOT_FOUND = -5,
    MQTT_CONN_FAIL_NOT_A_CONNACK_MSG = -4,
    MQTT_CONN_FAIL_DNS = -3,
    MQTT_CONN_FAIL_TIMEOUT_RECEIVING = -2,
    MQTT_CONN_FAIL_TIMEOUT_SENDING = -1,
    MQTT_CONNACK_ACCEPTED = 0,
    MQTT_CONNACK_REFUSED_PROTOCOL_VER = 1,
    MQTT_CONNACK_REFUSED_ID_REJECTED = 2,
    MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE = 3,
    MQTT_CONNACK_REFUSED_BAD_USER_OR_PASS = 4,
    MQTT_CONNACK_REFUSED_NOT_AUTHORIZED = 5
};



//連線MQTT指令 
typedef struct mqtt_connect_info
{
  unsigned  char* client_id;
  unsigned  char* username;
  unsigned  char* password;
  unsigned  char* will_topic;
  unsigned  char* will_message;
  int keepalive;
  int will_qos;
  int will_retain;
  int clean_session;
    
} mqtt_connect_info_t;


int mqtt_get_type(unsigned char* buffer);
int mqtt_get_connect_ret_code(unsigned char* buffer);
int mqtt_get_qos(unsigned char* buffer);
uint16_t mqtt_get_id(unsigned char* buffer, uint16_t length);

int mqtt_msg_connect(mqtt_connect_info_t* info,unsigned char **data_ptr,unsigned char* buffer,int buffer_length);
int mqtt_msg_connect_ack(unsigned char *buff);
int mqtt_msg_subscribe_topic(unsigned char* topic, int qos,unsigned char **data_ptr,unsigned char* buffer,int buffer_length);
int mqtt_msg_subscribe_ack(unsigned char* buffer, uint16_t length);
int mqtt_msg_publish(unsigned char* topic,unsigned char* date, int data_length, int qos, int retain,unsigned  char **data_ptr,unsigned char* buffer,int buffer_length);

int mqtt_get_total_length(unsigned char* buffer, uint16_t length);

int mqtt_msg_puback(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length);
int mqtt_msg_pubrel(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length);
int mqtt_msg_pubrec(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length);
int mqtt_msg_pubcomp(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length);

const char* mqtt_get_publish_topic(unsigned char* buffer, uint16_t* length);
const char* mqtt_get_publish_data(unsigned char* buffer, uint16_t* length);

int mqtt_msg_pingreq(unsigned char **data_ptr,unsigned char* buffer,int buffer_length);

#endif

4.咱利用網路模組的TCP連線上以後 然後需要傳送第一條訊息(注:並不是上來就可以訂閱主題的) MQTT軟體規定呢,你傳送的第一條資訊是連線資訊(相當於咱要先登入) 他規定了幾個引數! ClientID: 各個客戶端必須設定一個ID,各個客戶端必須都不一樣 假設是 123456 使用者名稱: 咱安裝MQTT軟體的時候可以設定MQTT軟體的登入的使用者名稱 假設是yang 密碼: 咱安裝MQTT軟體的時候可以設定MQTT軟體的登入的密碼 假設是 11223344

測試MQTT連線協議

1.以下協議是我為了能夠讓大家好理解整個MQTT協議,所以再次做了精簡(切勿使用下面的作為工程專案)
/**
* @brief  連線伺服器的打包函式
* @param  
* @retval 
* @example 
**/
int ConnectMqtt(char *ClientID,char *Username,char *Password)
{
    int ClientIDLen = strlen(ClientID);
    int UsernameLen    = strlen(Username);
    int PasswordLen = strlen(Password);
    int DataLen = 0;
    int Index = 2;
    int i = 0;
    DataLen = 12 + 2+2+ClientIDLen+UsernameLen+PasswordLen;
    MqttSendData[0] = 0x10;                //MQTT Message Type CONNECT
    MqttSendData[1] = DataLen;    //剩餘長度(不包括固定頭部)
    MqttSendData[Index++] = 0;        // Protocol Name Length MSB    
    MqttSendData[Index++] = 4;        // Protocol Name Length LSB    
    MqttSendData[Index++] = 'M';        // ASCII Code for M    
    MqttSendData[Index++] = 'Q';        // ASCII Code for Q    
    MqttSendData[Index++] = 'T';        // ASCII Code for T    
    MqttSendData[Index++] = 'T';        // ASCII Code for T    
    MqttSendData[Index++] = 4;        // MQTT Protocol version = 4    
    MqttSendData[Index++] = 0xc2;        // conn flags 
    MqttSendData[Index++] = 0;        // Keep-alive Time Length MSB    
    MqttSendData[Index++] = 60;        // Keep-alive Time Length LSB  60S心跳包  
    MqttSendData[Index++] = (0xff00&ClientIDLen)>>8;// Client ID length MSB    
    MqttSendData[Index++] = 0xff&ClientIDLen;    // Client ID length LSB  

    for(i = 0; i < ClientIDLen; i++)
    {
        MqttSendData[Index + i] = ClientID[i];          
    }
    Index = Index + ClientIDLen;
    
    if(UsernameLen > 0)
    {   
        MqttSendData[Index++] = (0xff00&UsernameLen)>>8;//username length MSB    
        MqttSendData[Index++] = 0xff&UsernameLen;    //username length LSB    
        for(i = 0; i < UsernameLen ; i++)
        {
            MqttSendData[Index + i] = Username[i];    
        }
        Index = Index + UsernameLen;
    }
    
    if(PasswordLen > 0)
    {    
        MqttSendData[Index++] = (0xff00&PasswordLen)>>8;//password length MSB    
        MqttSendData[Index++] = 0xff&PasswordLen;    //password length LSB    
        for(i = 0; i < PasswordLen ; i++)
        {
            MqttSendData[Index + i] = Password[i];    
        }
        Index = Index + PasswordLen; 
    }    
    return Index;
}


假設我ClientID填寫的是:123456 UserName填寫的是:yang Password填寫的是:11223344 執行以後得到以下資料 10 22 00 04 4D 51 54 54 04 C2 00 78 00 06 31 32 33 34 35 36 00 04 79 61 6E 67 00 08 31 31 32 32 33 33 34 34 然後把這個資料發給TCP 伺服器,如果沒有錯誤,伺服器就會回 20 02 00 00 咱可以用TCP除錯助手試一試 IP地址:47.92.31.46注意:IP地址可能不能不能使用了 如果IP不可以連線可以填域名 mnif.cn
埠號:1883

先說一件事情所有的MQTT資料哈第一個位元組是說明整個資料是幹什麼的資料 第二個位元組是說它後面的資料的總個數 10 : 固定,MQTT規定的連線用0x10 22: 是說0x22後面有0x22個數據34個 00 04: 後面記錄MQTT版本號的位元組個數 4D 51 54 54: M Q T T版本號字元 這個是4版本,不同版本不一樣 3版本的是MQIsdp 額,瞭解就可以 04: 版本號是 0x04 C2:這個呢想了解具體呢,需要看協議http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028

C2 的二進位制是 1100 0010

bit7 bit6:是否有使用者名稱和密碼 bit5 :遺囑是否需要伺服器保留 bit4 bit3:遺囑的訊息等級 bit2:是否設定了遺囑 bit1:是否清除以前的連線資訊 bit0:保留,預設0 上面就是說有使用者名稱和密碼,每次連線的時候清除連線資訊,沒有設定遺囑(後面會說)


00 78: 心跳包是120S一次(這個自己連線的時候自己設定), MQTT規定客戶端必須發心跳包,客戶端傳送的心跳包資料是 0xC0 0x00,這是MQTT規定的 如果心跳包間隔了你設定心跳包的1.5倍時間,你沒有發給伺服器,伺服器就認為你掉線了,這是關於遺囑的問題,,後面會說 你發給伺服器 0xC0 0x00伺服器會回你 0xD00x00這個知道就行了

00 06:客戶端的ClientId有6位 後面的31 32 33 34 35 36 就是ClientId ,這是MQTT伺服器規定的,每個客戶端必須有各自的ClientId

00 04: MQTT的使用者名稱 79 61 6E 67我安裝MQTT的時候設定的MQTT的使用者名稱是yang

00 08: MQTT的密碼 31 31 32 32 33 33 34 34我安裝MQTT的時候設定的MQTT密碼

好了,總結下就是連線上TCP伺服器 然後傳送 10 22 00 04 4D 51 54 54 04 C2 00 03 00 06 31 32 33 34 35 36 00 04 79 61 6E 67 00 08 31 31 32 32 33 33 34 34 伺服器呢就會回你20 02 00 00 20: 固定 02: 後面有兩個資料 00 00 注意後面的第一個資料 00 ,如果你設定了 Clean Session為1 :便會回覆 01

最後一個數據呢,有幾個返回值,0就說明成功,其它就是有各種問題 比如說回的是 20 02 00 04就說明使用者名稱或者密碼有問題.


訂閱主題

假設告訴伺服器我訂閱的是2222 假設訂閱的時候訂閱的主題的訊息標識是1,訊息等級是0 那麼打包以後就是 82 09 00 01 00 04 32 32 32 32 00然後把這個資料發給TCP伺服器

讓測試用TCP除錯助手訂閱,然後用咱的MQTT除錯助手發信息給咱的TCP除錯助手 注意:現在咱的TCP可能已經斷開了,因為咱的TCP除錯助手沒有在規定時間內傳送心跳包 首先準備好除錯助手


首先連線 10 22 00 04 4D 51 54 54 04 C2 00 03 00 06 31 32 33 34 35 36 00 04 79 61 6E 67 00 08 31 31 32 32 33 33 34 34


然後訂閱 82 09 00 01 00 04 32 32 32 32 00


然後用MQTT除錯助手發訊息




/**
* @brief  MQTT訂閱/取消訂閱資料打包函式
* @param  SendData 
* @param  topic                主題 
* @param  qos         訊息等級 
* @param  whether     訂閱/取消訂閱請求包
* @retval 
* @example 
**/
int MqttSubscribeTopic(char *topic,u8 qos,u8 whether)
{    
    int topiclen = strlen(topic);
    int i=0,index = 0;
  
    if(whether)
        MqttSendData[index++] = 0x82;                        //0x82 //訊息型別和標誌 SUBSCRIBE 訂閱
    else
        MqttSendData[index++] = 0xA2;                        //0xA2 取消訂閱
    MqttSendData[index++] = topiclen + 5;                //剩餘長度(不包括固定頭部)
    MqttSendData[index++] = 0;                          //訊息識別符號,高位
    MqttSendData[index++] = 0x01;                    //訊息識別符號,低位
    MqttSendData[index++] = (0xff00&topiclen)>>8;    //主題長度(高位在前,低位在後)
    MqttSendData[index++] = 0xff&topiclen;              //主題長度 
    
    for (i = 0;i < topiclen; i++)
    {
        MqttSendData[index + i] = topic[i];
    }
    index = index + topiclen;
    
    if(whether)
    {
        MqttSendData[index] = qos;//QoS級別
        index++;
    }
    return index;
}

假設上面的MqttSubscribeTopic("2222",0,1)

0x82: 告訴MQTT伺服器,我要訂閱主題 0x09: 後面的資料個數 0x000x01注意哈,訂閱主題的時候可以設定了標識標識呢1-65535 之所以有這個傢伙:咱訂閱的時候怎麼判斷訂閱成功了呢??? 訂閱成功以後呢!伺服器會返回咱訂閱成功的回覆,回覆裡面就包含著咱寫的這個標識 咱呢可以對比下這個標識,然後呢就知道到底是不是訂閱成功了. 0x000x04後面訂閱主題的長度 32 32 32 32 訂閱的主題是 2222 最後一個 00 是說訊息等級,一般呢,訂閱設定為0 就可以 那就說一下這個訊息等級有什麼用吧! 咱傳送資料的時候也會攜帶一個訊息等級 假設是0那麼這條訊息是不是真的發給MQTT伺服器(Broker)了,就不知道了,

假設是1 那麼一個客戶端傳送訊息以後呢,伺服器一看訊息等級是1,那麼就會回給那個傳送訊息的客戶端一個應答訊息 客戶端可以根據有沒有回覆應答確認發沒傳送成功

假設是2 這個呢伺服器和客戶端之間會有雙向的應答!後面會詳細說.

如果按照上面發呢,伺服器會回 90 03 00 01 00 90:固定 03:後面的資料長度 00 01:這條主題的標識 00:訊息等級

如果訂閱多個主題假設訂閱兩個主題 訊息等級第一個是0 第二個是1 90 04 00 01 00 01 90:固定 03:後面的資料長度 00 01:這條主題的標識 00:訊息等級 01:訊息等級

假設訂閱失敗 後面的訊息等級就會變為 0x80 (訂閱一個主題) 90 03 00 01 00 90:固定 03:後面的資料長度 00 01:這條主題的標識 80:訊息等級變為0x80



訂閱兩個主題,第一個訂閱失敗 後面的訊息等級就會變為 0x80 90 03 00 01 00 90:固定 04:後面的資料長度 00 01:這條主題的標識 80:訊息等級 00:訊息等級

釋出訊息

釋出的時候呢,資訊裡面都有以下內容 釋出的主題,訊息,訊息等級,是不是需要伺服器保留訊息,訊息的標識
/**
* @brief  MQTT釋出資料打包函式
* @param  mqtt_message 
* @param  topic                主題 
* @param  qos         訊息等級 
* @retval 
* @example 
**/
int MqttPublishData(char * topic, char * message, u8 qos)
{  
    int topic_length = strlen(topic);    
    int message_length = strlen(message);  
    int i,index=0;    
    static u16 id=0;
    
    MqttSendData[index++] = 0x30;    // MQTT Message Type PUBLISH  30:訊息等級是0  32訊息等級是1  34訊息等級是2

  
    if(qos)
        MqttSendData[index++] = 2 + topic_length + 2 + message_length;//資料長度
    else
        MqttSendData[index++] = 2 + topic_length + message_length;   // Remaining length  

  
    MqttSendData[index++] = (0xff00&topic_length)>>8;//主題長度
    MqttSendData[index++] = 0xff&topic_length;
         
    for(i = 0; i < topic_length; i++)
    {
        MqttSendData[index + i] = topic[i];//拷貝主題
    }
    index += topic_length;
        
    if(qos)
    {
        MqttSendData[index++] = (0xff00&id)>>8;
        MqttSendData[index++] = 0xff&id;
        id++;
    }
  
    for(i = 0; i < message_length; i++)
    {
        MqttSendData[index + i] = message[i];//拷貝資料
    }
    index += message_length;
        
    return index;
}

釋出的主題: 誰訂閱了這個主題,伺服器就會把相應的訊息傳給誰 訊息等級:上面說了 是不是需要伺服器保留訊息:一會和遺囑一塊說 訊息的標識:每條訊息加個標識,用來區分訊息

遺囑

還記得上面

我直接說遺囑是啥意思哈! 假設我手機和一個裝置訂閱主題和釋出主題對應,我就能和這個裝置通訊了 但是,我怎麼知道這個裝置掉線了呢? 當然完全可以自己發信息給那個裝置,如果不回覆,就說明掉線了 但是呢!MQTT伺服器提供了一種方式 假設我設定好一個裝置的遺囑訊息是offline 遺囑釋出的主題是 aaaaa 另一個裝置訂閱的主題是 aaaaa 如果裝置掉線,伺服器就會給訂閱了aaaaa的裝置傳送offline

還記得上面說的不 伺服器如果在你設定的心跳包時間的1.5倍收不到心跳包就認為你掉線了. 當然訂閱系統主題也可以,這個後面再說.

心跳包

MQTT規定的,傳送完連線協議之後 傳送的心跳包資料是C0 00 傳送時間:連線協議裡面的心跳包時間(你可以提前發) 然後伺服器回覆 D0 00

接收所有裝置資料

1.有人會問,如果我想監控所有裝置的資料應該怎麼做

就是說,我有個所有裝置都可以管理的後臺 假設我是用C#做了一個MQTT的上位機,監控所有的資料 笨法: 你訂閱的時候把所有裝置釋出的主題全部訂閱一遍 假設現在其中一個裝置,想獲取其它連個裝置的資料 其它兩個裝置釋出的主題如下:

另一個裝置 訂閱 aaaaa 然後再訂閱 wwww



然後就可以了



另一個客戶端

MQTT自帶的絕招: 先說一下哈 假設一個客戶端釋出的主題是 tttt/aaaaa 還有一個客戶端釋出的主題是 tttt/wwww 如果想讓有一個客戶端接收他倆的資料 你只需要訂閱tttt/#



2.上面的僅僅侷限於wwww/XXX的裝置,真正接收所有裝置資料 把客戶端放到安裝MQTT軟體的那臺伺服器上,IP地址填寫 localhost 訂閱的主題填寫 #

只所以必須在伺服器才可以. 是因為這個許可權控制

允許 IP地址(127.0.0.1) 訂閱和釋出的主題 $SYS/# #

用系統主題監控裝置上下線

1.在伺服器上訂閱 $SYS/#

在剛一執行訂閱

$SYS/brokers : 叢集節點列表

$SYS/brokers/[email protected]/sysdescr 伺服器描述

$SYS/brokers/[email protected]/version 伺服器版本

咱主要關注的是下面的

會接收到這個主題: $SYS/brokers/[email protected]/datetime

這個主題的訊息是: 2020-12-17 13:06:16

這個主題每隔1S釋出一次時間,這個是系統自帶的

會接收到這個主題: $SYS/brokers/[email protected]/uptime

這個主題的訊息是: 175 days,18 hours, 52 minutes, 19 seconds

這個主題每隔1S釋出一次時間,這個是MQTT伺服器啟動執行的時間

某個裝置上線,下面是clientid為 e28d35c7-8 的裝置上線了

$SYS/brokers/[email protected]/clients/e28d35c7-8/connected

某個裝置掉線,下面是clientid為acdd2b6a-e的裝置掉線了

$SYS/brokers/[email protected]/clients/acdd2b6a-e/disconnected

2.監控所有裝置上下線只需要

監控裝置上線,訂閱:$SYS/brokers/[email protected]/clients/+/connected

監控裝置下線,訂閱:$SYS/brokers/[email protected]/clients/+/disconnected

提示:上面的加號代表任意.這也是MQTT的一個招式.

MQTT訊息等級和DUP

1.假設客戶端1 釋出的主題是 1111 ;訊息等級是:0;傳送的訊息是999最終傳送的資訊如下: 30 0b 00 04 31 31 31 31 39 39 39 訊息等級是0是說明該訊息傳送出去就完事了,伺服器不會回覆任何應答資訊. 至於該訊息發沒發給伺服器,不知道! 假設客戶端2 訂閱的主題是:1111訊息等級是 0 假設客戶端1 確實把訊息發給了伺服器 客戶端2 收到訊息以後,不需要做任何操作      

2.假設客戶端1 釋出的主題是 1111 ;訊息等級是:1;傳送的訊息是999最終傳送的資訊如下: 32 0b 00 04 31 31 31 31 XX XX 39 39 39 XX XX是在傳送的時候需要加上的訊息識別符號: 訊息識別符號XX XX隨意即可:範圍1-65535 假設訊息識別符號是 00 01

傳送完以上訊息以後,伺服器會回覆: (PUBACK) 告訴客戶端我收到了 40 02 00 01 (00 01就是咱上面傳送的訊息識別符號) 這樣就證明訊息確實送達給了伺服器

如果客戶端1 釋出完訊息以後沒有接收到伺服器的應答 則可以重新發布訊息 32 0b 00 04 31 31 31 31 XX XX 39 39 39 XX XX可以和上次的一樣,也可以不一樣

3.假設客戶端2訂閱了主題是 1111 ;訊息等級是:1 伺服器接收到客戶端1傳送的訊息之後,轉發給客戶端2 32 0b 00 04 31 31 31 31 XX XX 39 39 39 注意現在的XX XX(訊息識別符號)是伺服器自己隨機生成的了 假設識別符號是 00 02 客戶端2在接收到訊息之後需要返回應答(PUBACK) 告訴伺服器我收到了 40 02 00 02

如果客戶端2不回覆:40 02 00 02(後面咱就叫 PUBACK) 伺服器便會一直髮送訊息給客戶端2 3A 0b 00 04 31 31 31 31 00 02 39 39 39 注意開頭變為了 3A (伺服器自動會把重傳標誌置一)

高4位是 3 固定 後面四位: 第一位:DUP標記這條訊息是不是重傳的 第2,3位:訊息等級01:訊息等級1 10:訊息等級2 最後一位:RETAIN 是否需要伺服器保留這條訊息 本來是 32 0011 0010 變為了 3A 0011 1010

其實伺服器加上DUP是為了讓客戶端知道,我這條訊息是重傳的, 因為伺服器第一次發的時候客戶端沒有返回PUBACK,但是伺服器知道我確實是傳給了客戶端 客戶端這邊假設真的是沒有及時的回覆PUBACK,那麼有兩種方式處理 1.客戶端再次接收到訊息以後,無論訊息有沒有DUP標誌,直接處理訊息 如果判斷這條訊息是需要返回 PUBACK的,那麼直接根據訊息裡面的訊息識別符號返回 PUBACK 即可 2.判斷下如果有DUP標誌,那麼再提取下訊息標識,看一下我先前是不是處理了有相同訊息識別符號的訊息 如果有就說明我已經處理了,只是沒有返回PUBACK,那麼我不去處理這條訊息 直接根據訊息裡面的訊息識別符號返回PUBACK就可以

  2.2.3 其實....   但是整體來說,對於訊息等級是1的訊息統統處理即可   然後根據訊息裡面的訊息識別符號返回PUBACK即可   先說一下為什麼   其實在客戶端1釋出訊息等級是1的訊息的時候,   如果客戶端1由於某些原因沒有接收到伺服器的PUBACK   那麼客戶端1還會再發布先前的訊息   其實現在就有兩條或者多條相同的訊息在伺服器裡面   這些相同的訊息(識別符號不一樣的訊息)就會發給客戶端2   如果客戶端2一直不應答(PUBACK),那麼伺服器便會把所有的沒有收到應答的訊息   的DUP標記置一以後不停的發給客戶端2...   直至客戶端2應答了所有的訊息,或者客戶端2斷線了   伺服器才停止傳送   對於微控制器而言,這些處理只能自己去實現   為了方便和節省記憶體,對於訊息等級是1的訊息   可以直接根據訊息裡面的訊息識別符號返回PUBACK

  所以對於訊息等級是1的訊息,其實客戶端至少會接收到1次訊息




3.1 假設客戶端1 釋出的主題是 1111 ;訊息等級是:2;傳送的訊息是999最終傳送的資訊如下:

  34 0b 00 04 31 31 31 31 XX XX 39 39 39   XX XX是在傳送的時候需要加上的訊息識別符號:   訊息識別符號XX XX隨意即可:範圍1-65535   假設訊息識別符號是 00 01   注意:伺服器接收到此訊息以後並不會立即傳送給訂閱了主題是1111,訊息等級是2的客戶端

  伺服器接收到以後會返回: PUBREC) "告訴客戶端我收到了"   50 02 00 01

  客戶端1需要返回: PUBREL) "好的"   62 02 00 01   注意:返回這個以後,訊息才會下發給訂閱了主題是1111,訊息等級是2的客戶端

  伺服器接著會返回: PUBCOMP)   70 02 00 01



  很多人介紹QS2都說保證訊息只傳輸一次,其實實際上是這樣保證的   客戶端1在傳送完訊息以後   34 0b 00 04 31 31 31 31 XX XX 39 39 39   伺服器會返回(PUBREC)

  但是隻要客戶端1不回覆 (PUBREL)   無論客戶端1現在傳送多少條訊息等級是2的訊息   伺服器都不會理會,伺服器只會記錄你傳送的最後一條訊息   客戶端1只有回覆了(PUBREL)   伺服器才會把最後一條訊息轉發出去   最後返回 (PUBCOMP)




3.2 假設客戶端2訂閱了主題是 1111 ;訊息等級是:2

  伺服器接收到客戶端1傳送的訊息,然後確認接收到客戶端1的(PUBREL)之後,   轉發給客戶端2 :

  34 0b 00 04 31 31 31 31 XX XX 39 39 39

  注意現在的XX XX(訊息識別符號)是伺服器自己隨機生成的了

  假設識別符號是 00 02

  客戶端2接收到以後需要返回: PUBREC) "告訴伺服器我收到了"    50 02 00 02   注意:如果客戶端2不回覆: PUBREC),那麼伺服器會不停的傳送   34 0b 00 04 31 31 31 31 XX XX 39 39 39   直至客戶端2回覆: PUBREC)



  伺服器接收到以後會返回: PUBREL) "好的"   62 02 00 02

  客戶端2最後需要返回: PUBCOMP)   70 02 00 02   注意:即使客戶端2不返回(PUBCOMP),伺服器隔一段時間也會預設客戶端2回覆了(PUBCOMP)



十,補充(關於retain)





釋出訊息的時候,第一個位元組的最後一位代表 Retain 意思是,是否需要伺服器保留這個訊息 如果設定了伺服器保留了這個訊息,那麼只要客戶端訂閱了這個訊息的主題 伺服器就會立馬傳送給客戶端保留的這個訊息 詳細說明: 假設我釋出的主題是:1111 訊息是:999 ,訊息等級隨意(假設是0)然後設定了 Retain位置為1 31 09 00 04 31 31 31 31 39 39 39 客戶端1把這條訊息發給了伺服器 然後過了一會,客戶端2上線了 訂閱了主題 1111 因為上面的訊息設定了讓伺服器保留 所以只要客戶端2 一訂閱1111,便會收到 999這個訊息 這就是Retain的作用



當然,有更巧妙的應用 我一般用這個來發送線上或者掉線,或者開關量資料 1.我設定客戶端1的遺囑釋出的主題是 1111遺囑訊息是 offline Retain為1 2.我設定客戶端1連線上伺服器以後先發布一條訊息 釋出的主題也是 1111訊息是: online Retain為1

注意:伺服器最終只會保留最後一條需要保留的訊息

只要是另一個客戶端訂閱 1111 如果客戶端1是掉線的,那麼便會立即收到 offline 如果客戶端1是線上的,呢麼便會立即收到 online

有些時候咱控制開關,咱開啟上位機以後想立即知道開關的狀態 最好的方式就是把傳送開關亮資料的主題 Retain設定為1 那麼只要是上位機一訂閱裝置釋出的開關量主題, 便會立即得到開關量資料! 這樣便提高了使用者體驗.