1. 程式人生 > >Paho MQTT 嵌入式c客戶端研究筆記 (二)

Paho MQTT 嵌入式c客戶端研究筆記 (二)

  paho.mqtt.embedded-c-master\MQTTPacket\samples,這個目錄裡面封裝了釋出訊息、訂閱訊息的示例。執行pub0sub1,這個示例裡面會去訂閱主題訊息、釋出主題訊息。並且訂閱和釋出的訊息是同一個主題,所以在執行過程中會看到迴圈列印同一份訊息。程式碼如下:

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include "MQTTPacket.h"
#include "transport.h"

/* This is in order to get an asynchronous signal to stop the sample,
as the code loops waiting for msgs on the subscribed topic.
Your actual code will depend on your hw and approach*/
#include <signal.h> int toStop = 0; void cfinish(int sig) { signal(SIGINT, NULL); toStop = 1; } void stop_init(void) { signal(SIGINT, cfinish); signal(SIGTERM, cfinish); } /* */ int main(int argc, char *argv[]) { MQTTPacket_connectData data = MQTTPacket_connectData_initializer; int
rc = 0; int mysock = 0; unsigned char buf[200]; int buflen = sizeof(buf); int msgid = 1; MQTTString topicString = MQTTString_initializer; int req_qos = 0; char* payload = "mypayload"; int payloadlen = strlen(payload); int len = 0; char *host = "m2m.eclipse.org"; int
port = 1883; stop_init(); if (argc > 1) host = argv[1]; if (argc > 2) port = atoi(argv[2]); mysock = transport_open(host, port); if(mysock < 0) return mysock; printf("Sending to hostname %s port %d\n", host, port); data.clientID.cstring = "me"; data.keepAliveInterval = 20; data.cleansession = 1; data.username.cstring = "testuser"; data.password.cstring = "testpassword"; //連線伺服器 len = MQTTSerialize_connect(buf, buflen, &data); rc = transport_sendPacketBuffer(mysock, buf, len); /* wait for connack */ if (MQTTPacket_read(buf, buflen, transport_getdata) == CONNACK) { unsigned char sessionPresent, connack_rc; if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0) { printf("Unable to connect, return code %d\n", connack_rc); goto exit; } } else goto exit; /* subscribe 訂閱主題訊息*/ topicString.cstring = "substopic"; len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos); rc = transport_sendPacketBuffer(mysock, buf, len); if (MQTTPacket_read(buf, buflen, transport_getdata) == SUBACK) /* wait for suback */ { unsigned short submsgid; int subcount; int granted_qos; rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen); if (granted_qos != 0) { printf("granted qos != 0, %d\n", granted_qos); goto exit; } } else goto exit; /* loop getting msgs on subscribed topic迴圈讀取訊息 */ topicString.cstring = "pubtopic"; while (!toStop) { /* transport_getdata() has a built-in 1 second timeout, your mileage will vary */ if (MQTTPacket_read(buf, buflen, transport_getdata) == PUBLISH) { unsigned char dup; int qos; unsigned char retained; unsigned short msgid; int payloadlen_in; unsigned char* payload_in; int rc; MQTTString receivedTopic; rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic, &payload_in, &payloadlen_in, buf, buflen); printf("message arrived %.*s\n", payloadlen_in, payload_in); } printf("publishing reading\n"); //下面兩行是用來發布訊息。這裡釋出,上面訂閱,就形成了一個迴圈。 len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)payload, payloadlen); rc = transport_sendPacketBuffer(mysock, buf, len); } printf("disconnecting\n"); len = MQTTSerialize_disconnect(buf, buflen); rc = transport_sendPacketBuffer(mysock, buf, len); exit: transport_close(mysock); return 0; }

在這個示例中,改造一下,把釋出訊息的程式碼給註釋掉。我們發現,過了一會,迴圈讀訊息的地方就再也接收不到伺服器傳送過來的訊息了。推測是此時客戶端與伺服器之間的長連線已經斷了。回到程式碼,

//設定心跳包間隔時間
data.keepAliveInterval = 20;

Keep Alive timer
The Keep Alive timer is present in the variable header of a MQTT CONNECT message.

The Keep Alive timer, measured in seconds, defines the maximum time interval between >messages received from a client. It enables the server to detect that the network connection to a client has dropped, without having to wait for the long TCP/IP timeout. The client has a responsibility to send a message within each Keep Alive time period. In the absence of a data-related message during the time period, the client sends a PINGREQ message, which the server acknowledges with a PINGRESP message.

If the server does not receive a message from the client within one and a half times the Keep Alive time period (the client is allowed “grace” of half a time period), it disconnects the client as if the client had sent a DISCONNECT message. This action does not impact any of the client’s subscriptions. See DISCONNECT for more details.

If a client does not receive a PINGRESP message within a Keep Alive time period after sending a PINGREQ, it should close the TCP/IP socket connection.

The Keep Alive timer is a 16-bit value that represents the number of seconds for the time period. The actual value is application-specific, but a typical value is a few minutes. The maximum value is approximately 18 hours. A value of zero (0) means the client is not disconnected.

The format of the Keep Alive timer is shown in the table below. The ordering of the 2 bytes of the Keep Alive Timer is MSB, then LSB (big-endian).

  pub0sub1示例程式碼中,刪除釋出訊息程式碼塊只保留訂閱訊息功能以後,客戶端在20秒之內都可以接收到伺服器推送過來的訊息。如果20秒內客戶端沒有向伺服器傳送PINGREQ訊息,那麼伺服器會關閉掉TCP/IP埠連線。
  因此,如果希望開啟一個可以永遠保持訂閱訊息的客戶端,需要在設定的心跳間隔時間內向伺服器傳送PINGREQ訊息。具體做法可以這樣,程式碼如下:

while (!toStop)
    {

        /****迴圈訂閱訊息*************/
        topicString.cstring = "pubtopic";
        len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);

        rc = transport_sendPacketBuffer(mysock, buf, len);
        printf("publishing reading000\n");
        if (MQTTPacket_read(buf, buflen, transport_getdata) == SUBACK)  /* wait for suback */
        {
            unsigned short submsgid;
            int subcount;
            int granted_qos;

            rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
            if (granted_qos != 0)
            {
                printf("granted qos != 0, %d\n", granted_qos);
                goto exit;
            }
        }
        else
            goto exit;
        /*****************/
        /* transport_getdata() has a built-in 1 second timeout,
        your mileage will vary */
        int state = MQTTPacket_read(buf, buflen, transport_getdata);
        //printf("state is = %d\n", state);
        //printf("PUBLISH is = %d\n", PUBLISH);
        if (state != -1){
                    printf("state2 is = %d\n", state);
            }
        if (state == PUBLISH)
        {
            unsigned char dup;
            int qos;
            unsigned char retained;
            unsigned short msgid;
            int payloadlen_in;
            unsigned char* payload_in;
            int rc;
            MQTTString receivedTopic;

            rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
                    &payload_in, &payloadlen_in, buf, buflen);
            printf("message arrived %.*s\n", payloadlen_in, payload_in);            
        }

        //printf("publishing reading\n");
        //len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)payload, payloadlen);
        //rc = transport_sendPacketBuffer(mysock, buf, len);
    }

  我們在迴圈讀取訊息的程式碼塊中加入了訂閱訊息,這樣可以保持客戶端與伺服器之間長連線不會斷開。當然,這麼做效果並不好,因為迴圈傳送訂閱訊息會對伺服器產生比較多的負載。可以對程式碼做個優化,比如每間隔10秒鐘傳送一次訂閱訊息的請求。百度雲官方提供的客戶端程式碼就是這麼實現的,下一次我們拿出來做對比。文件程式碼點這裡