1. 程式人生 > >rpc框架yar之源碼解析- 協議和傳輸

rpc框架yar之源碼解析- 協議和傳輸

data- register lin sent request 釋放空間 整型 htonl 開始

Yar 協議頭和傳輸源碼分析
 這篇博客主要學習rpc框架yar的協議頭和傳輸的實現, 能力有限,有些語句沒有看懂,所以猜測了一部分。

yar_header_t的實現

_yar_header的定義主要在yar_protocol.h和yar_protocol.c裏面,下面介紹這兩個文件裏的源碼。
借用網上的一幅圖片,請求體包括 yar_header + packager_name + yar_request_t 這三個部分,返回類似。
下面主要介紹yar_header的部分。

技術分享圖片

?

yar_protocol.h


// 定義yar_header_t這個結構體的內容,一共82個字節。
typedef struct _yar_header {
    unsigned int   id;                        // 4
    unsigned short version;             // 2
    unsigned int   magic_num;        // 4
    unsigned int   reserved;            // 4
    unsigned char  provider[32];    // 32
    unsigned char  token[32];        // 32
    unsigned int   body_len;          //  4
}

// 下面是聲明兩個函數,稍後在.c文件中說明具體方法
yar_header_t * php_yar_protocol_parse(char *payload);
void php_yar_protocol_render(yar_header_t *header, uint id, char *provider, char *token, uint body_len, uint reserved);

yar_protocol.c


// 這個函數的作用應該是給 header指向的結構體賦值
/* htonl函數介紹:將主機數轉換成無符號長整型的網絡字節順序。
 *                           本函數將一個32位數從主機字節順序轉換成網絡字節順序。
 */
void php_yar_protocol_render(yar_header_t *header, uint id, char *provider, char *token, uint body_len, uint reserved) /* {{{ */ {
    header->magic_num = htonl(YAR_PROTOCOL_MAGIC_NUM);
    header->id = htonl(id);
    header->body_len = htonl(body_len);
    header->reserved = htonl(reserved);
    if (provider) {
        memcpy(header->provider, provider, strlen(provider));
    }
    if (token) {
        memcpy(header->token, token, strlen(token));
    }
    return;
} /* }}} */

// 這個函數是將payload指向的字符串,轉成header結構體對應的內容 然後返回。
yar_header_t * php_yar_protocol_parse(char *payload) /* {{{ */ {
    yar_header_t *header = (yar_header_t *)payload;

    header->magic_num = ntohl(header->magic_num);

    if (header->magic_num != YAR_PROTOCOL_MAGIC_NUM) {
        header->magic_num = htonl(header->magic_num);
        return NULL;
    }

    header->id = ntohl(header->id);
    header->body_len = ntohl(header->body_len);
    header->reserved = ntohl(header->reserved);

    return header;
} /* }}} */

Q: 下面的這句 我沒有看明白, 感覺直接返回就可以了。

if (header->magic_num != YAR_PROTOCOL_MAGIC_NUM) {
        header->magic_num = htonl(header->magic_num);
        return NULL;
    }


yar傳輸源碼分析

關於yar傳輸方面源碼分析,還是先從下面的圖開始吧。感謝作圖的人,比我說的直觀。

技術分享圖片

傳輸層主要涉及到如下幾個文件:
transports/curl.c、transports/socket.c 、yar_transport.h、yar_transport.c 4個文件。

yar_transport.h


yar_transport.h 主要定義兩個函數和一些結構體。

// 這個應該是傳輸方式的方法定義,後面會在curl.c socket.c中實現下面的內容
typedef struct _yar_transport_interface {
    void *data;
    int  (*open)(struct _yar_transport_interface *self, zend_string *address, long options, char **msg);
    int  (*send)(struct _yar_transport_interface *self, struct _yar_request *request, char **msg);
    struct _yar_response * (*exec)(struct _yar_transport_interface *self, struct _yar_request *request);
    int  (*setopt)(struct _yar_transport_interface *self, long type, void *value, void *addition);
    int  (*calldata)(struct _yar_transport_interface *self, yar_call_data_t *calldata);
    void (*close)(struct _yar_transport_interface *self);
} yar_transport_interface_t;

typedef struct _yar_transport {
    const char *name;
    struct _yar_transport_interface * (*init)();
    void (*destroy)(yar_transport_interface_t *self);
    yar_transport_multi_t *multi;
} yar_transport_t;

// 下面是並行調用時的結構體定義
typedef struct _yar_transport_multi_interface {
    void *data;
    int (*add)(struct _yar_transport_multi_interface *self, yar_transport_interface_t *cp);
    int (*exec)(struct _yar_transport_multi_interface *self, yar_concurrent_client_callback *callback);
    void (*close)(struct _yar_transport_multi_interface *self);
} yar_transport_multi_interface_t;

typedef struct _yar_transport_multi {
    struct _yar_transport_multi_interface * (*init)();
} yar_transport_multi_t;

// 根據名稱獲得對應的傳輸方式,curl / sock
PHP_YAR_API const yar_transport_t * php_yar_transport_get(char *name, int nlen);

// 註冊傳輸方式到yar_transports_list
PHP_YAR_API int php_yar_transport_register(const yar_transport_t *transport);

yar_transport.c


這個文件主要實現上面兩個函數,即註冊curl和socket兩種方式,然後根據name獲得傳輸方式。

// 定義list
struct _yar_transports_list {
    unsigned int size;
    unsigned int num;
    const yar_transport_t **transports;
} yar_transports_list;

// 註冊方法到list裏面
PHP_YAR_API int php_yar_transport_register(const yar_transport_t *transport) /* {{{ */ {

    if (!yar_transports_list.size) {
       yar_transports_list.size = 5;
       yar_transports_list.transports = (const yar_transport_t **)malloc(sizeof(yar_transport_t *) * yar_transports_list.size);
    } else if (yar_transports_list.num == yar_transports_list.size) {
       yar_transports_list.size += 5;
       yar_transports_list.transports = (const yar_transport_t **)realloc(yar_transports_list.transports, sizeof(yar_transport_t *) * yar_transports_list.size);
    }
    yar_transports_list.transports[yar_transports_list.num] = transport;

    return yar_transports_list.num++;
} /* }}} */

// 根據name獲得方式
PHP_YAR_API const yar_transport_t * php_yar_transport_get(char *name, int nlen) /* {{{ */ {
    int i = 0;
    for (;i<yar_transports_list.num;i++) {
        if (strncmp(yar_transports_list.transports[i]->name, name, nlen) == 0) {
            return yar_transports_list.transports[i];
        }
    }

    return NULL;
} /* }}} */

// 下面這兩句沒有看懂什麽意思
le_calldata = zend_register_list_destructors_ex(php_yar_calldata_dtor, NULL, "Yar Call Data", module_number);
le_plink = zend_register_list_destructors_ex(NULL, php_yar_plink_dtor, "Yar Persistent Link", module_number);

transports/socket.c


主要是socket方式需要的方法的具體實現, 先看下面方法
用yar_transport_t 定義一個常量

const yar_transport_t yar_transport_socket = {
    "sock",
    php_yar_socket_init,
    php_yar_socket_destroy,
    NULL
}; /* }}} */

php_yar_socket_init的定義如下

yar_transport_interface_t * php_yar_socket_init() /* {{{ */ {
    yar_socket_data_t *data;
    yar_transport_interface_t *self;

    self = emalloc(sizeof(yar_transport_interface_t));
    self->data = data = ecalloc(1, sizeof(yar_socket_data_t));

    self->open      = php_yar_socket_open;
    self->send      = php_yar_socket_send;
    self->exec      = php_yar_socket_exec;
    self->setopt    = php_yar_socket_setopt;
    self->calldata  = NULL;
    self->close     = php_yar_socket_close;

    return  self;
} /* }}} */

然後我們再看yar_socket_data_t的定義, 其實主要使用php_stream這個流類型 底層的具體實現不追了。

typedef struct _yar_socket_data_t {
    char persistent;
    php_stream *stream;  // 主要
} yar_socket_data_t;

下面是php_yar_socket_open函數的實現

php_stream *stream = NULL;
...
// 這個是核心代碼,應該是根據地址和配置創建一個流的對象。
stream = php_stream_xport_create(ZSTR_VAL(address), ZSTR_LEN(address), 0, STREAM_XPORT_CLIENT|STREAM_XPORT_CONNECT, persistent_key, &tv, NULL, &errstr, &err);

php_yar_socket_close, 這個函數就是關閉流,然後釋放空間

if (!data->persistent && data->stream) {
        php_stream_close(data->stream);
    }

php_yar_socket_send 這個函數主要是把需要發送的內容,按照一定格式pack,然後放到stream中。
裏面有幾個函數沒有看懂,以後在詳細查找:

// 先構造header的內容
php_yar_protocol_render(&header, request->id, "Yar PHP Client", NULL, ZSTR_LEN(payload), data->persistent? YAR_PROTOCOL_PERSISTENT : 0);

// 復制到buf這個變量中
memcpy(buf, (char *)&header, sizeof(yar_header_t));

// 這個地方 我不太了解為什麽要用goto
wait_io:
        if (bytes_left) {
            retval = php_select(fd+1, NULL, &rfds, NULL, &tv);

            if (retval == -1) {
                zend_string_release(payload);
                spprintf(msg, 0, "select error ‘%s‘", strerror(errno));
                return 0;
            } else if (retval == 0) {
                zend_string_release(payload);
                spprintf(msg, 0, "select timeout %ldms reached", YAR_G(timeout));
                return 0;
            }

            if (PHP_SAFE_FD_ISSET(fd, &rfds)) {
                if ((ret = php_stream_xport_sendto(data->stream, ZSTR_VAL(payload) + bytes_sent, bytes_left, 0, NULL, 0)) > 0) {
                    bytes_left -= ret;
                    bytes_sent += ret;
                }
            }
            goto wait_io;
        }
    }

transports/curl.c


curl.c跟socket類似,依賴curl/curl.h文件裏的內容,如

CURL        *cp;

rpc框架yar之源碼解析- 協議和傳輸