301-STM32+BC26基本控制篇-重點詳解-MQTT協議
<p><iframe name="ifd" src="https://mnifdv.cn/resource/cnblogs/ZLBC26AA/" frameborder="0" scrolling="auto" width="100%" height="1500"></iframe></p>
先來體驗一下MQTT通訊
1.提示:
可以把MQTT軟體安裝到自己的電腦,也可以安裝在雲伺服器上
如果把MQTT伺服器安裝在自己的電腦上,連線伺服器的IP地址就是自己電腦的IP地址
如果安裝到雲伺服器上,連線伺服器的IP地址就是雲伺服器的IP地址.
2.開啟除錯助手
3.需要開啟兩個,預設連線提供的伺服器測試.
第一個配置如下:
釋出的主題:aaaaa
訂閱的主題:Topic
點選連線,然後點選訂閱
第二個配置如下:
釋出的主題:Topic
訂閱的主題:aaaaa
點選連線,然後點選訂閱
4.第一個軟體發訊息:傳送的訊息123456,然後點擊發送
使用者會看到第二個軟體收到訊息
提示:這個軟體是自己開發的,裡面的顯示都是自己規定的.
第二個客戶端的訂閱那一項填寫的是 aaaaa 其實就是在告訴伺服器,我需要資料標識是 aaaaa的訊息 既然你告訴了伺服器了,那麼伺服器只要接收到資料標識是 aaaaa 的訊息,那麼就會主動把訊息發給你
5.同理,讓下面的客戶端把訊息發給上面的客戶端
6.簡要說明:
連線上MQTT伺服器以後,只要是兩個裝置之間訂閱和釋出的主題對上了 那麼這兩個裝置就可以通訊了 對於初學者可能疑惑,你軟體點點點到底內部是怎麼做到的其實MQTT就是一個TCP伺服器,它是在TCP通訊的時候封裝了一套協議. 咱們就叫它MQTT協議,注意本質上就是TCP傳輸資料,這個資料有格式而已! 首先是使用TCP連線,然後傳送MQTT連線協議,然後傳送MQTT訂閱主題的協議. 這樣的話,伺服器就知道你需要哪種標識的資料了. 當伺服器收到這種標識的資料的時候,伺服器就會主動轉發給你. 其實MQTT伺服器主要工作就是做資料轉發,但是你需要告訴它你需要什麼樣的資料.
思考
1.其實理解一個東西最好的方式就是:你要設想如果讓你自己做一個這樣的伺服器,你會怎麼做.
2.現在需求是做一個負責資料轉發的軟體
首先,平時的時候咱做的TCP伺服器都是,一個或者多個客戶端連線咱做的TCP伺服器,然後TCP伺服器處理客戶端的資料. 現在呢!需求變了! 假設我有5個網路裝置,3個手機.我現在想讓網路裝置把資料遠端傳給手機.而且我還需要記錄網路裝置上傳的資料. 假設通訊是這樣的(而且後期還會不停的增加裝置和手機) 3.咋辦??? 1. 需要記錄所有裝置的資料 2. 裝置和手機之間存在多對一和一對多,所以,必須需要個公共的伺服器進行資料的中轉. 假設你就把這個伺服器做成TCP伺服器,有人問,你咋不做成UDP呢? UDP是無連線狀態,傳送資料不好判斷是不是傳送成功,我還是少找些麻煩! 還有就是要實現遠端,有個公網IP就可以,可以自己買個伺服器,上網路公司拉一根專網 或者用自己電腦,用花生殼對映 還是用雲伺服器吧!就是執行在別人的伺服器上的一臺電腦(就是一臺電腦),IP地址直接是公網.方便. 4.怎麼設計這個TCP伺服器??? 1.為了應對這種通訊,首先裝置傳送的資料決不能是單單的資料,必須加點東西 2.如果把傳送的資料帶上標識呢? 假設裝置1傳送的資料是: aaaaa資料 (aaaaa是資料標識,後面是真實資料) 3.然後呢!假設手機1就接收資料標識是aaaaa的資料,怎麼讓伺服器轉發給它呢??? 4.如果手機1在連線上TCP伺服器的時候 告訴TCP伺服器我接收資料標識是 aaaaa的資料 5.通過上面的方式是不是有點眉頭了???? 咱呢姑且把 "告訴TCP伺服器我接收資料標識是 aaaaa的資料"這個事情呢,起個名字 訂閱的主題是 aaaaa 把 "假設裝置1傳送的資料是:aaaaa資料 " 訊息前面的 aaaaa 叫做 釋出的主題是aaaaa 5.總結上面的就是 手機1先連線TCP伺服器,然後呢,規定個協議,告訴TCP伺服器我訂閱的主題是aaaaa 這樣呢伺服器就記住了,當出現訊息前面的主題是aaaaa的訊息的時候,他就把這個訊息發給手機1 當然咱假設,裝置1連線上TCP伺服器,然後,告訴TCP伺服器我訂閱的主題是wwww 這樣呢伺服器就記住了,當出現訊息前面的主題是wwww的訊息的時候,他就把這個訊息發給裝置1 然後裝置1連線上TCP伺服器以後呢,這樣傳送資訊(假設傳送的訊息是123456): aaaaa123456 伺服器一接收到客戶端的訊息,就取出來這個訊息的標識是什麼,取出來的是 aaaaa 然後呢,看下記錄的誰需要訊息標識是aaaaa的訊息,然後找到了手機1 最後把這個訊息傳送給手機1這個客戶端,然後手機1就接收到了1123456這個訊息 同理:手機1傳送wwww998877然後這個訊息就會發給裝置1 ,裝置1就會收到 998877 6.總結 這個伺服器道理上是這樣,伺服器記錄各個裝置的資訊,各個裝置訂閱的主題,然後呢,判斷這個訊息然後進行轉發 但是...咱做個簡單的完全可以做出來,但是要想做的完善,而且要支援龐大訊息數量的裝置(來個百萬級).....不是一朝一夕就可以的. 其實很長時間以前,人們就有這種需求了.多對一和一對多通訊 所以呢,一些組織和單位就開始解決這種問題,開始做這種軟體,所以MQTT就誕生了. 之所以叫MQTT是因為是外國人做的這種TCP伺服器,外國人呢,為實現這種功能的TCP伺服器取了個名字叫 Message Queuing Telemetry Transport 然後取每個首字母就叫 MQTT了 其實有很多家做MQTT軟體,但是呢,我比較喜歡用emqtt來說一下具體的MQTT協議
1,首先咱知道就是個TCP伺服器,所以呢,需要先用TCP連線上他們的伺服器. 2,咱用Android ,C#,QT,網頁等等連線MQTT伺服器的時候有現成的封裝好的庫可以用,其實說白了就是呼叫函式而已..... 3,但是對於微控制器而言要想實現MQTT通訊,那麼就需要藉助網路模組 大部分的網路模組都可以實現TCP通訊,咱呢,就需要在TCP的基礎上按照MQTT協議封裝下咱的資料 注:其實官方給了現成的MQTT的封裝資料和解析資料的程式) https://docs.emqx.io/sdk_tools?category=MQTT_Clients (官方提供的各個開發的庫) 微控制器用下面這個,不過我以前用的這個,因為庫功能很全,佔用記憶體有點大,所以後期使用的是自己重新封裝的.下面是自己當前使用的mqtt最底層
/** ****************************************************************************** * @author yang feng wu * @version V1.0.0 * @date 2019/12/15 * @brief ****************************************************************************** ****************************************************************************** */ #define MQTTCLIENT_C_//如果沒有定義 #include "mqtt_msg.h" #include "string.h" #include "stm32f10x.h" #define MQTT_MAX_FIXED_HEADER_SIZE 3 uint16_t mqtt_message_id = 0; enum mqtt_connect_flag { MQTT_CONNECT_FLAG_USERNAME = 1 << 7, MQTT_CONNECT_FLAG_PASSWORD = 1 << 6, MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5, MQTT_CONNECT_FLAG_WILL = 1 << 2, MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1 }; //__attribute((__packed__)) struct mqtt_connect_variable_header { uint8_t lengthMsb; uint8_t lengthLsb; uint8_t magic[4]; uint8_t version; uint8_t flags; uint8_t keepaliveMsb; uint8_t keepaliveLsb; }; int mqtt_get_type(unsigned char* buffer) { return (buffer[0] & 0xf0) >> 4; } int mqtt_get_connect_ret_code(unsigned char* buffer) { return (buffer[3]); } int mqtt_get_qos(unsigned char* buffer) { return (buffer[0] & 0x06) >> 1; } int append_string(int *length,unsigned char* buffer,int buffer_length,unsigned char* string, int len) { if((*length) + len + 2 > buffer_length)//加上 ClientID 和 記錄 ClientID個數(兩位) 以後超出了陣列 return -1; buffer[(*length)++] = len >> 8; buffer[(*length)++] = len & 0xff; c_memcpy(buffer + (*length), string, len); (*length) += len; return len + 2; } uint16_t append_message_id(int *length,unsigned char* buffer,int buffer_length, uint16_t message_id) { // If message_id is zero then we should assign one, otherwise // we'll use the one supplied by the caller while(message_id == 0) message_id = ++mqtt_message_id; if((*length) + 2 > buffer_length) return 0; buffer[(*length)++] = message_id >> 8; buffer[(*length)++] = message_id & 0xff; return message_id; } int fini_message(unsigned char **data_ptr,int length,unsigned char* buffer, int type, int dup, int qos, int retain) { int remaining_length = length - MQTT_MAX_FIXED_HEADER_SIZE; if(remaining_length > 127) { buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); buffer[1] = 0x80 | (remaining_length % 128); buffer[2] = remaining_length / 128; length = remaining_length + 3; *data_ptr = buffer; } else { buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); buffer[2] = remaining_length; length = remaining_length + 2; *data_ptr = buffer + 1; } return length; } uint16_t mqtt_get_id(unsigned char* buffer, uint16_t length) { if(length < 1) return 0; switch(mqtt_get_type(buffer)) { case MQTT_MSG_TYPE_PUBLISH: { int i; int topiclen; for(i = 1; i < length; ++i) { if((buffer[i] & 0x80) == 0) { ++i; break; } } if(i + 2 >= length) return 0; topiclen = buffer[i++] << 8; topiclen |= buffer[i++]; if(i + topiclen >= length) return 0; i += topiclen; if(mqtt_get_qos(buffer) > 0) { if(i + 2 >= length) return 0; //i += 2; } else { return 0; } return (buffer[i] << 8) | buffer[i + 1]; } case MQTT_MSG_TYPE_PUBACK: case MQTT_MSG_TYPE_PUBREC: case MQTT_MSG_TYPE_PUBREL: case MQTT_MSG_TYPE_PUBCOMP: case MQTT_MSG_TYPE_SUBACK: case MQTT_MSG_TYPE_UNSUBACK: case MQTT_MSG_TYPE_SUBSCRIBE: { // This requires the remaining length to be encoded in 1 byte, // which it should be. if(length >= 4 && (buffer[1] & 0x80) == 0) return (buffer[2] << 8) | buffer[3]; else return 0; } default: return 0; } } /** * @brief 獲取MQTT返回的資料長度(去掉1和2位元組後面資料的長度) * @param buffer MQTT返回的資料首地址 * @param length 返回的資料個數 * @retval 資料長度 * @warning None * @example **/ int mqtt_get_total_length(unsigned char* buffer, uint16_t length) { int i; int totlen = 0; for(i = 1; i < length; ++i) { totlen += (buffer[i] & 0x7f) << (7 * (i - 1)); if((buffer[i] & 0x80) == 0) { ++i; break; } } totlen += i; return totlen; } /** * @brief 打包連線MQTT指令 * @param info MQTT資訊 * @param data_ptr 打包的資料首地址 * @param buffer 打包進的陣列 * @param buffer_length 陣列長度 * @retval 資料長度 * @warning None * @example **/ int mqtt_msg_connect(mqtt_connect_info_t* info,unsigned char **data_ptr,unsigned char* buffer,int buffer_length) { int length; struct mqtt_connect_variable_header* variable_header; mqtt_message_id = 0; length = MQTT_MAX_FIXED_HEADER_SIZE;//頭.連線型別1位,資料個數2位(如果大於127就需要兩位) if(length + sizeof(*variable_header) > buffer_length)//陣列不夠儲存的 return 0; variable_header = (void*)(buffer + length);//把陣列分給這個結構體裡面的變數 length += sizeof(*variable_header);//儲存完 連線型別,整個資料個數,版本號個數,版本號,等 variable_header->lengthMsb = 0;//版本名稱個數高位 variable_header->lengthLsb = 4;//版本名稱個數低位 c_memcpy(variable_header->magic, "MQTT", 4);//版本名稱MQTT variable_header->version = 4;//版本號 variable_header->flags = 0;//先清零 variable_header->keepaliveMsb = info->keepalive >> 8;//心跳包時間 variable_header->keepaliveLsb = info->keepalive & 0xff;//心跳包時間 if(info->clean_session)//清除連線資訊 variable_header->flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; if(info->client_id != NULL && info->client_id[0] != '\0')//client_id { if(append_string(&length,buffer,buffer_length, info->client_id, c_strlen(info->client_id)) < 0)//拷貝 return -1;//陣列不夠用呀... } else return -2;//沒有設定client_id if(info->will_topic != NULL && info->will_topic[0] != '\0')//遺囑 { if(append_string(&length,buffer,buffer_length , info->will_topic, c_strlen(info->will_topic)) < 0)//遺囑的主題 return -3; if(append_string(&length,buffer,buffer_length , info->will_message, c_strlen(info->will_message)) < 0)//遺囑的訊息 return -4; variable_header->flags |= MQTT_CONNECT_FLAG_WILL;//需要遺囑 if(info->will_retain)//遺囑是夠需要伺服器保留 variable_header->flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;//保留遺囑 variable_header->flags |= (info->will_qos & 3) << 3;//遺囑訊息等級 } if(info->username != NULL && info->username[0] != '\0')//username { if(append_string(&length,buffer,buffer_length, info->username, c_strlen(info->username)) < 0)//拷貝使用者名稱 return -5; variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME;//有使用者名稱 } if(info->password != NULL && info->password[0] != '\0')//password { if(append_string(&length,buffer,buffer_length, info->password, c_strlen(info->password)) < 0) return -6; variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD;//有密碼 } return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_CONNECT, 0, 0, 0);//最終組合連線MQTT的指令 } /** * @brief 判斷是否連線上MQTT * @param 伺服器返回的資料 * @param * @retval 0 連線成功 * @example **/ int mqtt_msg_connect_ack(unsigned char *buff) { if(mqtt_get_type(buff) == MQTT_MSG_TYPE_CONNACK) { return mqtt_get_connect_ret_code(buff); } return -1; } /** * @brief 斷開連線 * @param data_ptr 打包的資料首地址 * @param buffer 打包進的陣列 * @param buffer_length 陣列長度 * @retval 資料長度 * @warning None * @example **/ int mqtt_msg_disconnect(unsigned char **data_ptr,unsigned char* buffer,int buffer_length) { int length; length = MQTT_MAX_FIXED_HEADER_SIZE; return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0); } /** * @brief 訂閱主題 * @param topic 訂閱的主題 * @param qos 訊息等級 * @param data_ptr 打包的資料首地址 * @param buffer 打包進的陣列 * @param buffer_length 陣列長度 * @retval 資料長度 * @warning None * @example **/ int mqtt_msg_subscribe_topic(unsigned char* topic, int qos,unsigned char **data_ptr,unsigned char* buffer,int buffer_length) { int length; length = MQTT_MAX_FIXED_HEADER_SIZE; if(topic == NULL || topic[0] == '\0') return -1; if((mqtt_message_id = append_message_id(&length, buffer, buffer_length, 0)) == 0) return -2; if(append_string(&length, buffer, buffer_length, topic, c_strlen(topic)) < 0) return -3; if(length + 1 > buffer_length) return -4; buffer[length++] = qos; return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); } /** * @brief 判斷是否成功訂閱 * @param buffer 伺服器返回的資料 * @param length 伺服器返回的資料長度 * @retval 0:成功 1:失敗 * @example **/ int mqtt_msg_subscribe_ack(unsigned char* buffer, uint16_t length) { if(mqtt_get_type(buffer) == MQTT_MSG_TYPE_SUBACK) { if(mqtt_get_id(buffer,length) == mqtt_message_id) { return 0; } else { return 1; } } else { return 1; } } /** * @brief 釋出訊息 * @param topic 主題 * @param data 訊息 * @param data_length 訊息長度 * @param qos 訊息等級 * @param retain 是否需要保留訊息 * @param data_ptr 打包的資料首地址 * @param buffer 打包進的陣列 * @param buffer_length 陣列長度 * @retval 資料長度 * @warning None * @example **/ int mqtt_msg_publish(unsigned char* topic,unsigned char* date, int data_length, int qos, int retain,unsigned char **data_ptr,unsigned char* buffer,int buffer_length) { int length; length = MQTT_MAX_FIXED_HEADER_SIZE; if(topic == NULL || topic[0] == '\0') return -1; if(append_string(&length, buffer, buffer_length, topic, strlen(topic)) < 0) return -2; if(qos > 0) { if((mqtt_message_id = append_message_id(&length, buffer, buffer_length, 0)) == 0) return -3; } else mqtt_message_id = 0; if(length + data_length > buffer_length) return -4; memcpy(buffer + length, date, data_length); length += data_length; return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain); } int mqtt_msg_puback(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length) { int length; length = MQTT_MAX_FIXED_HEADER_SIZE; if(append_message_id(&length, buffer, buffer_length,message_id) == 0) return -1; return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBACK, 0, 0, 0); } int mqtt_msg_pubrec(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length) { int length; length = MQTT_MAX_FIXED_HEADER_SIZE; if(append_message_id(&length, buffer, buffer_length,message_id) == 0) return -1; return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBREC, 0, 0, 0); } int mqtt_msg_pubrel(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length) { int length; length = MQTT_MAX_FIXED_HEADER_SIZE; if(append_message_id(&length, buffer, buffer_length,message_id) == 0) return -1; return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBREL, 0, 1, 0); } int mqtt_msg_pubcomp(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length) { int length; length = MQTT_MAX_FIXED_HEADER_SIZE; if(append_message_id(&length, buffer, buffer_length,message_id) == 0) return -1; return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0); } const char* mqtt_get_publish_topic(unsigned char* buffer, uint16_t* length) { int i; int totlen = 0; int topiclen; for(i = 1; i < *length; ++i) { totlen += (buffer[i] & 0x7f) << (7 * (i -1)); if((buffer[i] & 0x80) == 0) { ++i; break; } } totlen += i; if(i + 2 >= *length) return NULL; topiclen = buffer[i++] << 8; topiclen |= buffer[i++]; if(i + topiclen > *length) return NULL; *length = topiclen; return (const char*)(buffer + i); } const char* mqtt_get_publish_data(unsigned char* buffer, uint16_t* length) { int i; int totlen = 0; int topiclen; int blength = *length; *length = 0; for(i = 1; i < blength; ++i) { totlen += (buffer[i] & 0x7f) << (7 * (i - 1)); if((buffer[i] & 0x80) == 0) { ++i; break; } } totlen += i; if(i + 2 >= blength) return NULL; topiclen = buffer[i++] << 8; topiclen |= buffer[i++]; if(i + topiclen >= blength) return NULL; i += topiclen; if(mqtt_get_qos(buffer) > 0) { if(i + 2 >= blength) return NULL; i += 2; } if(totlen < i) return NULL; if(totlen <= blength) *length = totlen - i; else *length = blength - i; return (const char*)(buffer + i); } /** * @brief 打包伺服器返回的心跳包資料(用不到) * @param data_ptr 打包的資料首地址 * @param buffer 打包進的陣列 * @param buffer_length 陣列長度 * @retval 資料長度 * @warning None * @example **/ int mqtt_msg_pingresp(unsigned char **data_ptr,unsigned char* buffer,int buffer_length) { int length; length = MQTT_MAX_FIXED_HEADER_SIZE; return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0); } /** * @brief 獲取傳送給伺服器的心跳包資料 * @param data_ptr 打包的資料首地址 * @param buffer 打包進的陣列 * @param buffer_length 陣列長度 * @retval 資料長度 * @warning None * @example **/ int mqtt_msg_pingreq(unsigned char **data_ptr,unsigned char* buffer,int buffer_length) { int length; length = MQTT_MAX_FIXED_HEADER_SIZE; return fini_message(data_ptr,length, buffer, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0); }
#ifndef MQTTCLIENT_H_ #define MQTTCLIENT_H_ #ifndef MQTTCLIENT_C_//如果沒有定義 #define MQTTCLIENT_Cx_ extern #else #define MQTTCLIENT_Cx_ #endif #include "string.h" #include "stm32f10x.h" #define c_memcpy memcpy #define c_memset memset #define c_strlen strlen enum mqtt_message_type { MQTT_MSG_TYPE_CONNECT = 1, MQTT_MSG_TYPE_CONNACK = 2, MQTT_MSG_TYPE_PUBLISH = 3, MQTT_MSG_TYPE_PUBACK = 4, MQTT_MSG_TYPE_PUBREC = 5, MQTT_MSG_TYPE_PUBREL = 6, MQTT_MSG_TYPE_PUBCOMP = 7, MQTT_MSG_TYPE_SUBSCRIBE = 8, MQTT_MSG_TYPE_SUBACK = 9, MQTT_MSG_TYPE_UNSUBSCRIBE = 10, MQTT_MSG_TYPE_UNSUBACK = 11, MQTT_MSG_TYPE_PINGREQ = 12, MQTT_MSG_TYPE_PINGRESP = 13, MQTT_MSG_TYPE_DISCONNECT = 14 }; enum mqtt_connack_return_code { MQTT_CONN_FAIL_SERVER_NOT_FOUND = -5, MQTT_CONN_FAIL_NOT_A_CONNACK_MSG = -4, MQTT_CONN_FAIL_DNS = -3, MQTT_CONN_FAIL_TIMEOUT_RECEIVING = -2, MQTT_CONN_FAIL_TIMEOUT_SENDING = -1, MQTT_CONNACK_ACCEPTED = 0, MQTT_CONNACK_REFUSED_PROTOCOL_VER = 1, MQTT_CONNACK_REFUSED_ID_REJECTED = 2, MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE = 3, MQTT_CONNACK_REFUSED_BAD_USER_OR_PASS = 4, MQTT_CONNACK_REFUSED_NOT_AUTHORIZED = 5 }; //連線MQTT指令 typedef struct mqtt_connect_info { unsigned char* client_id; unsigned char* username; unsigned char* password; unsigned char* will_topic; unsigned char* will_message; int keepalive; int will_qos; int will_retain; int clean_session; } mqtt_connect_info_t; int mqtt_get_type(unsigned char* buffer); int mqtt_get_connect_ret_code(unsigned char* buffer); int mqtt_get_qos(unsigned char* buffer); uint16_t mqtt_get_id(unsigned char* buffer, uint16_t length); int mqtt_msg_connect(mqtt_connect_info_t* info,unsigned char **data_ptr,unsigned char* buffer,int buffer_length); int mqtt_msg_connect_ack(unsigned char *buff); int mqtt_msg_subscribe_topic(unsigned char* topic, int qos,unsigned char **data_ptr,unsigned char* buffer,int buffer_length); int mqtt_msg_subscribe_ack(unsigned char* buffer, uint16_t length); int mqtt_msg_publish(unsigned char* topic,unsigned char* date, int data_length, int qos, int retain,unsigned char **data_ptr,unsigned char* buffer,int buffer_length); int mqtt_get_total_length(unsigned char* buffer, uint16_t length); int mqtt_msg_puback(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length); int mqtt_msg_pubrel(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length); int mqtt_msg_pubrec(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length); int mqtt_msg_pubcomp(uint16_t message_id,unsigned char **data_ptr,unsigned char* buffer,int buffer_length); const char* mqtt_get_publish_topic(unsigned char* buffer, uint16_t* length); const char* mqtt_get_publish_data(unsigned char* buffer, uint16_t* length); int mqtt_msg_pingreq(unsigned char **data_ptr,unsigned char* buffer,int buffer_length); #endif4.咱利用網路模組的TCP連線上以後 然後需要傳送第一條訊息(注:並不是上來就可以訂閱主題的) MQTT軟體規定呢,你傳送的第一條資訊是連線資訊(相當於咱要先登入) 他規定了幾個引數! ClientID: 各個客戶端必須設定一個ID,各個客戶端必須都不一樣 假設是 123456 使用者名稱: 咱安裝MQTT軟體的時候可以設定MQTT軟體的登入的使用者名稱 假設是yang 密碼: 咱安裝MQTT軟體的時候可以設定MQTT軟體的登入的密碼 假設是 11223344
測試MQTT連線協議
1.以下協議是我為了能夠讓大家好理解整個MQTT協議,所以再次做了精簡(切勿使用下面的作為工程專案)/** * @brief 連線伺服器的打包函式 * @param * @retval * @example **/ int ConnectMqtt(char *ClientID,char *Username,char *Password) { int ClientIDLen = strlen(ClientID); int UsernameLen = strlen(Username); int PasswordLen = strlen(Password); int DataLen = 0; int Index = 2; int i = 0; DataLen = 12 + 2+2+ClientIDLen+UsernameLen+PasswordLen; MqttSendData[0] = 0x10; //MQTT Message Type CONNECT MqttSendData[1] = DataLen; //剩餘長度(不包括固定頭部) MqttSendData[Index++] = 0; // Protocol Name Length MSB MqttSendData[Index++] = 4; // Protocol Name Length LSB MqttSendData[Index++] = 'M'; // ASCII Code for M MqttSendData[Index++] = 'Q'; // ASCII Code for Q MqttSendData[Index++] = 'T'; // ASCII Code for T MqttSendData[Index++] = 'T'; // ASCII Code for T MqttSendData[Index++] = 4; // MQTT Protocol version = 4 MqttSendData[Index++] = 0xc2; // conn flags MqttSendData[Index++] = 0; // Keep-alive Time Length MSB MqttSendData[Index++] = 60; // Keep-alive Time Length LSB 60S心跳包 MqttSendData[Index++] = (0xff00&ClientIDLen)>>8;// Client ID length MSB MqttSendData[Index++] = 0xff&ClientIDLen; // Client ID length LSB for(i = 0; i < ClientIDLen; i++) { MqttSendData[Index + i] = ClientID[i]; } Index = Index + ClientIDLen; if(UsernameLen > 0) { MqttSendData[Index++] = (0xff00&UsernameLen)>>8;//username length MSB MqttSendData[Index++] = 0xff&UsernameLen; //username length LSB for(i = 0; i < UsernameLen ; i++) { MqttSendData[Index + i] = Username[i]; } Index = Index + UsernameLen; } if(PasswordLen > 0) { MqttSendData[Index++] = (0xff00&PasswordLen)>>8;//password length MSB MqttSendData[Index++] = 0xff&PasswordLen; //password length LSB for(i = 0; i < PasswordLen ; i++) { MqttSendData[Index + i] = Password[i]; } Index = Index + PasswordLen; } return Index; }
假設我ClientID填寫的是:123456 UserName填寫的是:yang Password填寫的是:11223344 執行以後得到以下資料 10 22 00 04 4D 51 54 54 04 C2 00 78 00 06 31 32 33 34 35 36 00 04 79 61 6E 67 00 08 31 31 32 32 33 33 34 34 然後把這個資料發給TCP 伺服器,如果沒有錯誤,伺服器就會回 20 02 00 00 咱可以用TCP除錯助手試一試 IP地址:47.92.31.46注意:IP地址可能不能不能使用了 如果IP不可以連線可以填域名 mnif.cn
埠號:1883
先說一件事情所有的MQTT資料哈第一個位元組是說明整個資料是幹什麼的資料 第二個位元組是說它後面的資料的總個數 10 : 固定,MQTT規定的連線用0x10 22: 是說0x22後面有0x22個數據34個 00 04: 後面記錄MQTT版本號的位元組個數 4D 51 54 54: M Q T T版本號字元 這個是4版本,不同版本不一樣 3版本的是MQIsdp 額,瞭解就可以 04: 版本號是 0x04 C2:這個呢想了解具體呢,需要看協議http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028
C2 的二進位制是 1100 0010
bit7 bit6:是否有使用者名稱和密碼 bit5 :遺囑是否需要伺服器保留 bit4 bit3:遺囑的訊息等級 bit2:是否設定了遺囑 bit1:是否清除以前的連線資訊 bit0:保留,預設0 上面就是說有使用者名稱和密碼,每次連線的時候清除連線資訊,沒有設定遺囑(後面會說)訂閱主題
假設告訴伺服器我訂閱的是2222 假設訂閱的時候訂閱的主題的訊息標識是1,訊息等級是0 那麼打包以後就是 82 09 00 01 00 04 32 32 32 32 00然後把這個資料發給TCP伺服器讓測試用TCP除錯助手訂閱,然後用咱的MQTT除錯助手發信息給咱的TCP除錯助手 注意:現在咱的TCP可能已經斷開了,因為咱的TCP除錯助手沒有在規定時間內傳送心跳包 首先準備好除錯助手
然後用MQTT除錯助手發訊息
/** * @brief MQTT訂閱/取消訂閱資料打包函式 * @param SendData * @param topic 主題 * @param qos 訊息等級 * @param whether 訂閱/取消訂閱請求包 * @retval * @example **/ int MqttSubscribeTopic(char *topic,u8 qos,u8 whether) { int topiclen = strlen(topic); int i=0,index = 0; if(whether) MqttSendData[index++] = 0x82; //0x82 //訊息型別和標誌 SUBSCRIBE 訂閱 else MqttSendData[index++] = 0xA2; //0xA2 取消訂閱 MqttSendData[index++] = topiclen + 5; //剩餘長度(不包括固定頭部) MqttSendData[index++] = 0; //訊息識別符號,高位 MqttSendData[index++] = 0x01; //訊息識別符號,低位 MqttSendData[index++] = (0xff00&topiclen)>>8; //主題長度(高位在前,低位在後) MqttSendData[index++] = 0xff&topiclen; //主題長度 for (i = 0;i < topiclen; i++) { MqttSendData[index + i] = topic[i]; } index = index + topiclen; if(whether) { MqttSendData[index] = qos;//QoS級別 index++; } return index; }
假設上面的MqttSubscribeTopic("2222",0,1)
假設是1 那麼一個客戶端傳送訊息以後呢,伺服器一看訊息等級是1,那麼就會回給那個傳送訊息的客戶端一個應答訊息 客戶端可以根據有沒有回覆應答確認發沒傳送成功
假設是2 這個呢伺服器和客戶端之間會有雙向的應答!後面會詳細說.
如果按照上面發呢,伺服器會回 90 03 00 01 00 90:固定 03:後面的資料長度 00 01:這條主題的標識 00:訊息等級
如果訂閱多個主題假設訂閱兩個主題 訊息等級第一個是0 第二個是1 90 04 00 01 00 01 90:固定 03:後面的資料長度 00 01:這條主題的標識 00:訊息等級 01:訊息等級
假設訂閱失敗 後面的訊息等級就會變為 0x80 (訂閱一個主題) 90 03 00 01 00 90:固定 03:後面的資料長度 00 01:這條主題的標識 80:訊息等級變為0x80
釋出訊息
釋出的時候呢,資訊裡面都有以下內容 釋出的主題,訊息,訊息等級,是不是需要伺服器保留訊息,訊息的標識/** * @brief MQTT釋出資料打包函式 * @param mqtt_message * @param topic 主題 * @param qos 訊息等級 * @retval * @example **/ int MqttPublishData(char * topic, char * message, u8 qos) { int topic_length = strlen(topic); int message_length = strlen(message); int i,index=0; static u16 id=0; MqttSendData[index++] = 0x30; // MQTT Message Type PUBLISH 30:訊息等級是0 32訊息等級是1 34訊息等級是2 if(qos) MqttSendData[index++] = 2 + topic_length + 2 + message_length;//資料長度 else MqttSendData[index++] = 2 + topic_length + message_length; // Remaining length MqttSendData[index++] = (0xff00&topic_length)>>8;//主題長度 MqttSendData[index++] = 0xff&topic_length; for(i = 0; i < topic_length; i++) { MqttSendData[index + i] = topic[i];//拷貝主題 } index += topic_length; if(qos) { MqttSendData[index++] = (0xff00&id)>>8; MqttSendData[index++] = 0xff&id; id++; } for(i = 0; i < message_length; i++) { MqttSendData[index + i] = message[i];//拷貝資料 } index += message_length; return index; }釋出的主題: 誰訂閱了這個主題,伺服器就會把相應的訊息傳給誰 訊息等級:上面說了 是不是需要伺服器保留訊息:一會和遺囑一塊說 訊息的標識:每條訊息加個標識,用來區分訊息
遺囑
還記得上面 我直接說遺囑是啥意思哈! 假設我手機和一個裝置訂閱主題和釋出主題對應,我就能和這個裝置通訊了 但是,我怎麼知道這個裝置掉線了呢? 當然完全可以自己發信息給那個裝置,如果不回覆,就說明掉線了 但是呢!MQTT伺服器提供了一種方式 假設我設定好一個裝置的遺囑訊息是offline 遺囑釋出的主題是 aaaaa 另一個裝置訂閱的主題是 aaaaa 如果裝置掉線,伺服器就會給訂閱了aaaaa的裝置傳送offline還記得上面說的不 伺服器如果在你設定的心跳包時間的1.5倍收不到心跳包就認為你掉線了. 當然訂閱系統主題也可以,這個後面再說.
心跳包
MQTT規定的,傳送完連線協議之後 傳送的心跳包資料是C0 00 傳送時間:連線協議裡面的心跳包時間(你可以提前發) 然後伺服器回覆 D0 00接收所有裝置資料
1.有人會問,如果我想監控所有裝置的資料應該怎麼做
就是說,我有個所有裝置都可以管理的後臺 假設我是用C#做了一個MQTT的上位機,監控所有的資料 笨法: 你訂閱的時候把所有裝置釋出的主題全部訂閱一遍 假設現在其中一個裝置,想獲取其它連個裝置的資料 其它兩個裝置釋出的主題如下: 另一個裝置 訂閱 aaaaa 然後再訂閱 wwww用系統主題監控裝置上下線
1.在伺服器上訂閱 $SYS/#
在剛一執行訂閱
$SYS/brokers : 叢集節點列表
$SYS/brokers/[email protected]/sysdescr 伺服器描述
$SYS/brokers/[email protected]/version 伺服器版本
咱主要關注的是下面的
會接收到這個主題: $SYS/brokers/[email protected]/datetime
這個主題的訊息是: 2020-12-17 13:06:16
這個主題每隔1S釋出一次時間,這個是系統自帶的
會接收到這個主題: $SYS/brokers/[email protected]/uptime
這個主題的訊息是: 175 days,18 hours, 52 minutes, 19 seconds
這個主題每隔1S釋出一次時間,這個是MQTT伺服器啟動執行的時間
某個裝置上線,下面是clientid為 e28d35c7-8 的裝置上線了
$SYS/brokers/[email protected]/clients/e28d35c7-8/connected
某個裝置掉線,下面是clientid為acdd2b6a-e的裝置掉線了
$SYS/brokers/[email protected]/clients/acdd2b6a-e/disconnected
2.監控所有裝置上下線只需要
監控裝置上線,訂閱:$SYS/brokers/[email protected]/clients/+/connected
監控裝置下線,訂閱:$SYS/brokers/[email protected]/clients/+/disconnected
提示:上面的加號代表任意.這也是MQTT的一個招式.
MQTT訊息等級和DUP
1.假設客戶端1 釋出的主題是 1111 ;訊息等級是:0;傳送的訊息是999最終傳送的資訊如下: 30 0b 00 04 31 31 31 31 39 39 39 訊息等級是0是說明該訊息傳送出去就完事了,伺服器不會回覆任何應答資訊. 至於該訊息發沒發給伺服器,不知道! 假設客戶端2 訂閱的主題是:1111訊息等級是 0 假設客戶端1 確實把訊息發給了伺服器 客戶端2 收到訊息以後,不需要做任何操作 2.假設客戶端1 釋出的主題是 1111 ;訊息等級是:1;傳送的訊息是999最終傳送的資訊如下: 32 0b 00 04 31 31 31 31 XX XX 39 39 39 XX XX是在傳送的時候需要加上的訊息識別符號: 訊息識別符號XX XX隨意即可:範圍1-65535 假設訊息識別符號是 00 01 傳送完以上訊息以後,伺服器會回覆: (PUBACK) 告訴客戶端我收到了 40 02 00 01 (00 01就是咱上面傳送的訊息識別符號) 這樣就證明訊息確實送達給了伺服器 如果客戶端1 釋出完訊息以後沒有接收到伺服器的應答 則可以重新發布訊息 32 0b 00 04 31 31 31 31 XX XX 39 39 39 XX XX可以和上次的一樣,也可以不一樣 3.假設客戶端2訂閱了主題是 1111 ;訊息等級是:1 伺服器接收到客戶端1傳送的訊息之後,轉發給客戶端2 32 0b 00 04 31 31 31 31 XX XX 39 39 39 注意現在的XX XX(訊息識別符號)是伺服器自己隨機生成的了 假設識別符號是 00 02 客戶端2在接收到訊息之後需要返回應答(PUBACK) 告訴伺服器我收到了 40 02 00 02如果客戶端2不回覆:40 02 00 02(後面咱就叫 PUBACK) 伺服器便會一直髮送訊息給客戶端2 3A 0b 00 04 31 31 31 31 00 02 39 39 39 注意開頭變為了 3A (伺服器自動會把重傳標誌置一) 高4位是 3 固定 後面四位: 第一位:DUP標記這條訊息是不是重傳的 第2,3位:訊息等級01:訊息等級1 10:訊息等級2 最後一位:RETAIN 是否需要伺服器保留這條訊息 本來是 32 0011 0010 變為了 3A 0011 1010 其實伺服器加上DUP是為了讓客戶端知道,我這條訊息是重傳的, 因為伺服器第一次發的時候客戶端沒有返回PUBACK,但是伺服器知道我確實是傳給了客戶端 客戶端這邊假設真的是沒有及時的回覆PUBACK,那麼有兩種方式處理 1.客戶端再次接收到訊息以後,無論訊息有沒有DUP標誌,直接處理訊息 如果判斷這條訊息是需要返回 PUBACK的,那麼直接根據訊息裡面的訊息識別符號返回 PUBACK 即可 2.判斷下如果有DUP標誌,那麼再提取下訊息標識,看一下我先前是不是處理了有相同訊息識別符號的訊息 如果有就說明我已經處理了,只是沒有返回PUBACK,那麼我不去處理這條訊息 直接根據訊息裡面的訊息識別符號返回PUBACK就可以 2.2.3 其實.... 但是整體來說,對於訊息等級是1的訊息統統處理即可 然後根據訊息裡面的訊息識別符號返回PUBACK即可 先說一下為什麼 其實在客戶端1釋出訊息等級是1的訊息的時候, 如果客戶端1由於某些原因沒有接收到伺服器的PUBACK 那麼客戶端1還會再發布先前的訊息 其實現在就有兩條或者多條相同的訊息在伺服器裡面 這些相同的訊息(識別符號不一樣的訊息)就會發給客戶端2 如果客戶端2一直不應答(PUBACK),那麼伺服器便會把所有的沒有收到應答的訊息 的DUP標記置一以後不停的發給客戶端2... 直至客戶端2應答了所有的訊息,或者客戶端2斷線了 伺服器才停止傳送 對於微控制器而言,這些處理只能自己去實現 為了方便和節省記憶體,對於訊息等級是1的訊息 可以直接根據訊息裡面的訊息識別符號返回PUBACK 所以對於訊息等級是1的訊息,其實客戶端至少會接收到1次訊息
十,補充(關於retain)