STM串列埠轉tcp實現Mqtt客戶端
阿新 • • 發佈:2019-01-31
#include "emqtt.h"
#include "usart4_wifi.h"
#define MQTT_DUP_FLAG 1<<3
#define MQTT_QOS0_FLAG 0<<1
#define MQTT_QOS1_FLAG 1<<1
#define MQTT_QOS2_FLAG 2<<1
#define MQTT_RETAIN_FLAG 1
#define MQTT_CLEAN_SESSION 1<<1
#define MQTT_WILL_FLAG 1<<2
#define MQTT_WILL_RETAIN 1<<5
#define MQTT_USERNAME_FLAG 1<<7
#define MQTT_PASSWORD_FLAG 1<<6
#define TOPIC_MAX_LENGTH 64
#define MESSAGE_MAX_LENGTH 1024
//fixed_head×î´óΪ3 qossize×î´óΪ2
uint8_t mqtt_num_rem_len_bytes(const uint8_t* buf)
{
uint8_t num_bytes = 1 ;
if ((buf[1] & 0x80) == 0x80)
{
num_bytes++;
if ((buf[2] & 0x80) == 0x80)
{
num_bytes ++;
if ((buf[3] & 0x80) == 0x80)
{
num_bytes ++;
}
}
}
return num_bytes;
}
uint16_t mqtt_parse_rem_len(const uint8_t* buf)
{
uint16_t multiplier = 1;
uint16_t value = 0;
uint8_t digit;
buf++;
do {
digit = *buf;
value += (digit & 127) * multiplier;
multiplier *= 128;
buf++;
} while ((digit & 128) != 0);
return value;
}
uint16_t mqtt_parse_msg_id(const uint8_t* buf)
{
uint8_t type = MQTTParseMessageType(buf);
uint8_t qos = MQTTParseMessageQos(buf);
uint16_t id = 0;
uint8_t rlb;
uint8_t offset;
//printf("mqtt_parse_msg_id\n");
if(type >= MQTT_MSG_PUBLISH && type <= MQTT_MSG_UNSUBACK)
{
if(type == MQTT_MSG_PUBLISH)
{
if(qos != 0)
{
rlb = mqtt_num_rem_len_bytes(buf);
offset = *(buf+1+rlb)<<8; // topic UTF MSB
offset |= *(buf+1+rlb+1); // topic UTF LSB
offset += (1+rlb+2); // fixed header + topic size
id = *(buf+offset)<<8; // id MSB
id |= *(buf+offset+1); // id LSB
}
}
else
{
// fixed header length
// 1 for "flags" byte + rlb for length bytes
rlb = mqtt_num_rem_len_bytes(buf);
id = *(buf+1+rlb)<<8; // id MSB
id |= *(buf+1+rlb+1); // id LSB
}
}
return id;
}
uint16_t mqtt_parse_pub_topic(const uint8_t* buf, uint8_t* topic)
{
const uint8_t* ptr;
uint16_t topic_len = mqtt_parse_pub_topic_ptr(buf, &ptr);
//printf("mqtt_parse_pub_topic\n");
if(topic_len != 0 && ptr != NULL)
{
memcpy(topic, ptr, topic_len);
}
return topic_len;
}
uint16_t mqtt_parse_pub_topic_ptr(const uint8_t* buf, const uint8_t **topic_ptr)
{
uint16_t len = 0;
uint8_t rlb;
//printf("mqtt_parse_pub_topic_ptr\n");
if(MQTTParseMessageType(buf) == MQTT_MSG_PUBLISH)
{
// fixed header length = 1 for "flags" byte + rlb for length bytes
rlb = mqtt_num_rem_len_bytes(buf);
len = *(buf+1+rlb)<<8; // MSB of topic UTF
len |= *(buf+1+rlb+1); // LSB of topic UTF
// start of topic = add 1 for "flags", rlb for remaining length, 2 for UTF
*topic_ptr = (buf + (1+rlb+2));
}
else
{
*topic_ptr = NULL;
}
return len;
}
uint16_t mqtt_parse_publish_msg(const uint8_t* buf, uint8_t* msg)
{
const uint8_t* ptr;
//printf("mqtt_parse_publish_msg\n");
uint16_t msg_len = mqtt_parse_pub_msg_ptr(buf, &ptr);
if(msg_len != 0 && ptr != NULL)
{
memcpy(msg, ptr, msg_len);
}
return msg_len;
}
uint16_t mqtt_parse_pub_msg_ptr(const uint8_t* buf, const uint8_t **msg_ptr)
{
uint16_t len = 0;
uint8_t rlb;
uint8_t offset;
//printf("mqtt_parse_pub_msg_ptr\n");
if(MQTTParseMessageType(buf) == MQTT_MSG_PUBLISH)
{
// message starts at
// fixed header length + Topic (UTF encoded) + msg id (if QoS>0)
rlb = mqtt_num_rem_len_bytes(buf);
offset = (*(buf+1+rlb))<<8; // topic UTF MSB
offset |= *(buf+1+rlb+1); // topic UTF LSB
offset += (1+rlb+2); // fixed header + topic size
if(MQTTParseMessageQos(buf))
{
offset += 2; // add two bytes of msg id
}
*msg_ptr = (buf + offset);
// offset is now pointing to start of message
// length of the message is remaining length - variable header
// variable header is offset - fixed header
// fixed header is 1 + rlb
// so, lom = remlen - (offset - (1+rlb))
len = mqtt_parse_rem_len(buf) - (offset-(rlb+1));
}
else
{
*msg_ptr = NULL;
}
return len;
}
void mqtt_init(mqtt_broker_handle_t* broker, const char* clientid)
{
// Connection options
broker->alive = 300; // 300 seconds = 5 minutes
broker->seq = 1; // Sequency for message indetifiers
// Client options
memset(broker->clientid, 0, sizeof(broker->clientid));
memset(broker->username, 0, sizeof(broker->username));
memset(broker->password, 0, sizeof(broker->password));
if(clientid)
{
strncpy(broker->clientid, clientid, sizeof(broker->clientid));
}
else
{
strcpy(broker->clientid, "emqtt");
}
// Will topic
broker->clean_session = 1;
}
void mqtt_init_auth(mqtt_broker_handle_t* broker, const char* username, const char* password)
{
if(username && username[0] != '\0')
strncpy(broker->username, username, sizeof(broker->username)-1);
if(password && password[0] != '\0')
strncpy(broker->password, password, sizeof(broker->password)-1);
}
void mqtt_set_alive(mqtt_broker_handle_t* broker, uint16_t alive)
{
broker->alive = alive;
}
int mqtt_connect(mqtt_broker_handle_t* broker)
{
uint8_t flags = 0x00;
uint16_t clientidlen = strlen(broker->clientid);
uint16_t usernamelen = strlen(broker->username);
uint16_t passwordlen = strlen(broker->password);
uint16_t payload_len = clientidlen + 2;
// Variable header
uint8_t var_header[] =
{
0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70, // Protocol name: MQIsdp
0x03, // Protocol version
0x00, // Connect flags
0x00, 0x00 // Keep alive
};
// Fixed header
uint8_t fixedHeaderSize = 2; // Default size = one byte Message Type + one byte Remaining Length
uint8_t remainLen;
//¶àÒ»¸ö×Ö½Ú·ÀÖ¹³öÏÖ²»¹»µÄÇé¿ö
uint8_t fixed_header[3];
uint16_t offset = 0;
//¶à200¸ö×Ö½Ú·ÀÖ¹³öÏÖ²»¹»µÄÇé¿ö
uint8_t packet[sizeof(fixed_header)+sizeof(var_header)+200];
// Preparing the flags
//²»ÉèÖÃÓû§ÃûºÍÃÜÂëÈ¥µôÁ½¸öif
if(usernamelen)
{
payload_len += usernamelen + 2;
flags |= MQTT_USERNAME_FLAG;
}
if(passwordlen)
{
payload_len += passwordlen + 2;
flags |= MQTT_PASSWORD_FLAG;
}
//ÿ´Î¶¼³õʼ»¯ËùÒÔ´Ë´¦Ó¦¸ÃÊÇ1
if(broker->clean_session)
{
flags |= MQTT_CLEAN_SESSION;
}
//¸üжÔÓ¦µÄflags±ê־λ
var_header[9] = flags;
var_header[10] = broker->alive>>8;
var_header[11] = broker->alive&0xFF;
remainLen = sizeof(var_header)+payload_len;
//¼Ù¶¨´Ë´¦²»»á³¬¹ý127¸ö×Ö·ûÔò±£ÁôfixedHeaderSize
if (remainLen > 127)
{
fixedHeaderSize++; // add an additional byte for Remaining Length
}
// Message Type
fixed_header[0] = MQTT_MSG_CONNECT;
// Remaining Length
if (remainLen <= 127)
{
fixed_header[1] = remainLen;
}
else
{
// first byte is remainder (mod) of 128, then set the MSB to indicate more bytes
fixed_header[1] = remainLen % 128;
fixed_header[1] = fixed_header[1] | 0x80;
// second byte is number of 128s
fixed_header[2] = remainLen / 128;
}
memset(packet, 0, sizeof(packet));
memcpy(packet, fixed_header, fixedHeaderSize);
offset += fixedHeaderSize;//sizeof(fixed_header);
memcpy(packet+offset, var_header, sizeof(var_header));
offset += sizeof(var_header);
// Client ID - UTF encoded
packet[offset++] = clientidlen>>8;
packet[offset++] = clientidlen&0xFF;
memcpy(packet+offset, broker->clientid, clientidlen);
offset += clientidlen;
if(usernamelen)
{
// Username - UTF encoded
packet[offset++] = usernamelen>>8;
packet[offset++] = usernamelen&0xFF;
memcpy(packet+offset, broker->username, usernamelen);
offset += usernamelen;
}
if(passwordlen)
{
// Password - UTF encoded
packet[offset++] = passwordlen>>8;
packet[offset++] = passwordlen&0xFF;
memcpy(packet+offset, broker->password, passwordlen);
offset += passwordlen;
}
//·¢ËÍÊý¾Ý°ü
uart4SendChars(packet,offset);
return 1;
}
int mqtt_ping()
{
uint8_t packet[] = {
MQTT_MSG_PINGREQ, // Message Type, DUP flag, QoS level, Retain
0x00 // Remaining length
};
//·¢ËÍpingÊý¾Ý°ü
uart4SendChars(packet,sizeof(packet));
return 1;
}
int mqtt_publish_with_qos(mqtt_broker_handle_t* broker, const char* topic, const char* msg, uint8_t retain, uint8_t qos, uint16_t* message_id)
{
const uint16_t topiclen = strlen(topic);
const uint16_t msglen = strlen(msg);
uint8_t qos_flag = MQTT_QOS0_FLAG;
uint8_t qos_size = 0; // No QoS included
//½«Êý×é·ÖÅä³É×î´óÖµ topiclen×î´ó64
//msg ×î´ó1024
uint8_t var_header[TOPIC_MAX_LENGTH+2+2];//topiclen+2+qos_size
uint8_t varHeaderSize;
uint8_t fixedHeaderSize = 2;
uint16_t remainLen;
uint8_t fixed_header[3];
uint8_t packet[sizeof(fixed_header)+sizeof(var_header)+MESSAGE_MAX_LENGTH];
if(qos == 1)
{
qos_size = 2; // 2 bytes for QoS
qos_flag = MQTT_QOS1_FLAG;
}
else if(qos == 2)
{
qos_size = 2; // 2 bytes for QoS
qos_flag = MQTT_QOS2_FLAG;
}
varHeaderSize=topiclen+2+qos_size;
// Variable header
// Topic size (2 bytes), utf-encoded topic
memset(var_header, 0, sizeof(var_header));
var_header[0] = topiclen>>8;
var_header[1] = topiclen&0xFF;
memcpy(var_header+2, topic, topiclen);
if(qos_size)
{
var_header[topiclen+2] = broker->seq>>8;
var_header[topiclen+3] = broker->seq&0xFF;
if(message_id)
{ // Returning message id
*message_id = broker->seq;
}
broker->seq++;
}
// Fixed header
// the remaining length is one byte for messages up to 127 bytes, then two bytes after that
// actually, it can be up to 4 bytes but I'm making the assumption the embedded device will only
// need up to two bytes of length (handles up to 16,383 (almost 16k) sized message)
// Default size = one byte Message Type + one byte Remaining Length
remainLen = varHeaderSize+msglen;//sizeof(var_header)
if (remainLen > 127)
{
fixedHeaderSize++; // add an additional byte for Remaining Length
}
// Message Type, DUP flag, QoS level, Retain
fixed_header[0] = MQTT_MSG_PUBLISH | qos_flag;
if(retain)
{
fixed_header[0] |= MQTT_RETAIN_FLAG;
}
// Remaining Length
if (remainLen <= 127) {
fixed_header[1] = remainLen;
} else {
// first byte is remainder (mod) of 128, then set the MSB to indicate more bytes
fixed_header[1] = remainLen % 128;
fixed_header[1] = fixed_header[1] | 0x80;
// second byte is number of 128s
fixed_header[2] = remainLen / 128;
}
memset(packet, 0, sizeof(packet));
memcpy(packet, fixed_header, fixedHeaderSize);
memcpy(packet+fixedHeaderSize, var_header, varHeaderSize);
memcpy(packet+fixedHeaderSize+varHeaderSize, msg, msglen);
//·¢ËÍÊý¾Ý°ü
uart4SendChars(packet,fixedHeaderSize+varHeaderSize+msglen);
return 1;
}
int mqtt_publish(mqtt_broker_handle_t* broker, const char* topic, const char* msg, uint8_t retain)
{
return mqtt_publish_with_qos(broker, topic, msg, retain, 0, NULL);
}
int mqtt_pubrel(mqtt_broker_handle_t* broker, uint16_t message_id)
{
uint8_t packet[] = {
MQTT_MSG_PUBREL | MQTT_QOS1_FLAG, // Message Type, DUP flag, QoS level, Retain
0x02, // Remaining length
0x00,
0x00
};
packet[2]=message_id>>8;
packet[3]=message_id&0xFF;
//·¢ËÍÊý¾Ý°ü
uart4SendChars(packet,sizeof(packet));
return 1;
}
int mqtt_subscribe(mqtt_broker_handle_t* broker, const char* topic, uint16_t* message_id)
{
const uint16_t topiclen = strlen(topic);
// Variable header
uint8_t var_header[2]; // Message ID
//topic ×î´ó64 Topic size (2 bytes), utf-encoded topic, QoS byte
uint8_t utf_topic[TOPIC_MAX_LENGTH+3];
// Fixed header
uint8_t fixed_header[] = {
MQTT_MSG_SUBSCRIBE | MQTT_QOS1_FLAG, // Message Type, DUP flag, QoS level, Retain
0x00
};
//
uint8_t packet[sizeof(var_header)+sizeof(fixed_header)+sizeof(utf_topic)];
var_header[0] = broker->seq>>8;
var_header[1] = broker->seq&0xFF;
if(message_id)
{ // Returning message id
*message_id = broker->seq;
}
broker->seq++;
// utf topic
// Topic size (2 bytes), utf-encoded topic, QoS byte
memset(utf_topic, 0, sizeof(utf_topic));
utf_topic[0] = topiclen>>8;
utf_topic[1] = topiclen&0xFF;
memcpy(utf_topic+2, topic, topiclen);
//sizeof(var_header)+sizeof(utf_topic)
fixed_header[1]=sizeof(var_header)+topiclen+3;
memset(packet, 0, sizeof(packet));
memcpy(packet, fixed_header, sizeof(fixed_header));
memcpy(packet+sizeof(fixed_header), var_header, sizeof(var_header));
memcpy(packet+sizeof(fixed_header)+sizeof(var_header), utf_topic, topiclen+3);
//·¢ËÍÊý¾Ý°ü
uart4SendChars(packet,sizeof(fixed_header)+sizeof(var_header)+topiclen+3);
return 1;
}
int mqtt_unsubscribe(mqtt_broker_handle_t* broker, const char* topic, uint16_t* message_id)
{
const uint16_t topiclen = strlen(topic);
// Variable header
uint8_t var_header[2]; // Message ID
uint8_t utf_topic[TOPIC_MAX_LENGTH+2]; // Topic size (2 bytes), utf-encoded topic
// Fixed header
uint8_t fixed_header[] = {
MQTT_MSG_UNSUBSCRIBE | MQTT_QOS1_FLAG, // Message Type, DUP flag, QoS level, Retain
0x00
};
uint8_t packet[sizeof(var_header)+sizeof(fixed_header)+sizeof(utf_topic)];
var_header[0] = broker->seq>>8;
var_header[1] = broker->seq&0xFF;
if(message_id) { // Returning message id
*message_id = broker->seq;
}
broker->seq++;
// utf topic
memset(utf_topic, 0, sizeof(utf_topic));
utf_topic[0] = topiclen>>8;
utf_topic[1] = topiclen&0xFF;
memcpy(utf_topic+2, topic, topiclen);
fixed_header[1]=sizeof(var_header)+topiclen+2;
memset(packet, 0, sizeof(packet));
memcpy(packet, fixed_header, sizeof(fixed_header));
memcpy(packet+sizeof(fixed_header), var_header, sizeof(var_header));
memcpy(packet+sizeof(fixed_header)+sizeof(var_header), utf_topic, topiclen+2);
//·¢ËÍÊý¾Ý°ü
uart4SendChars(packet,sizeof(fixed_header)+sizeof(var_header)+topiclen+2);
return 1;
}