基於Protobuf和Libuv實現RPC
公司要把產品轉架構,雖然和我們驅動開發沒什麼關係,但還是抱著看熱鬧的心態研究了下和架構相關的一些問題。這階段主要研究了下RPC這個東西。
RPC(Remote Procedure Call Protocol)——遠端過程呼叫協議,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通訊程式之間攜帶資訊資料。在OSI網路通訊模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網路分散式多程式在內的應用程式更加容易。
RPC採用客戶機/伺服器模式。請求程式就是一個客戶機,而服務提供程式就是一個伺服器。首先,客戶機呼叫程序傳送一個有程序引數的呼叫資訊到服務程序,然後等待應答資訊。在伺服器端,程序保持睡眠狀態直到呼叫資訊到達為止。當一個呼叫資訊到達,伺服器獲得程序引數,計算結果,傳送答覆資訊,然後等待下一個呼叫資訊,最後,客戶端呼叫程序接收答覆資訊,獲得程序結果,然後呼叫執行繼續進行。
說到底就是在呼叫函式的時候從
call far ptr 目標地址
變為
call ip:port:service:procedure
表現上看和正常函式一樣,當已經不是用本地資源了,而是用來遠端的資源,這種機制正合分散式的思想。
一般RPC分為兩個步驟,一個是序列化和反序列化,另一個是網路通訊,這如果想要高效的實現RPC,這兩部分都要足夠高效,而google正式這樣的效率狂人。去年google釋出了自己的RPC框架grpc,但一方面grpc環境在windows中極難搭建,另一方面grpc在網路通訊這塊是基於HTTP2的,效率總歸不如私有協議來的快,於是就想著自己動手豐衣足食,自己動手做一個rpc。現在基於protobuf的rpc都是結合libevent實現的,所以想試試使用libuv。在序列化/反序列化和網路通訊這塊分別用了google的兩個開源專案:protobuf和libuv,protobuf是單獨的專案,是序列化和反序列化專案,libuv則是整合在nodejs中的,是用CompleteIO實現的高效IO庫。程式碼均下載自github上。
由於工作原因平時習慣了用C語言,所以在protobuf上我們選擇使用他的兄弟工程protobuf-c,這個工程在idl的編譯上使用了protobuf,而序列化和反序列化則使用了c程式碼來實現。libuv因為他的非同步呼叫機制和單執行緒阻塞的工作方式,使得他作為伺服器端是異常高效穩定的,但也這是這個特性使得它不太適合用於開發客戶端。
先說下思路:客戶端首先呼叫了一個函式,這個呼叫轉換為一個網路請求並提供了一個回撥函式,等請求返回時將呼叫這個回撥函式,當這個請求傳送到網路後,服務端解包,找到對應服務的例程並呼叫例程,得到返回值後,將返回值組包傳送給客戶端,客戶端解包後呼叫剛剛的回撥函式。
首先來設計私有協議,協議中需要確定網路包是請求包還是響應包,我們直接在資料包頭兩個位元組設定特徵碼來區別型別:
#define REQUEST_MAGIC "\x90\x90"
#define RESPONSE_MAGIC "\x91\x91"
#define MAGIC_SIZE 2
#define TYPE_UNKNOWN 0
#define TYPE_REQUEST 1
#define TYPE_RESPONSE 2
接下來是頭的設計,protobuf中一個服務是用一個service來修飾,而服務中的例程則是一系列類似函式的宣告。類似於這樣
service RpcTest{
rpc GetTest(Test1) returns(Test2) {}
}
於是便可以以一個服務名和一個例程索引來定位一個具體的例程,同時還需要有一個標識來確定一個響應對應的是哪個請求,所以請求頭可以這樣設計:
message RpcRequest{
required string service = 2;
required int32 request = 3;
required int32 id = 4;
}
而響應頭只需要告訴客戶端這個響應的是哪個請求:
message RpcResponse{
required int32 id = 2;
}
在頭的後面則跟著具體的資料,由於protobuf序列化的二進位制資料中不包含有長度資訊,所以需要我們自己給出長度資訊。這樣私有協議就是這樣的結構:
MAGIC|req_size|packed request|msg_size|packed message
接下來要設計具體的客戶端和伺服器資料結構,伺服器端需要維護一系列的客戶端資訊,而客戶端則需要維護一系列自己傳送的請求:
//一個請求的存根
struct rpcpu_call_stub
{
uint32_t id;
request_return cb;
ProtobufCMessageDescriptor* output;
void* context;
};
//代表一個與客戶端的連結
struct rpcpu_server
{
uv_tcp_t srv_client;
uv_tcp_s* srv_server;
uint8_t status;
};
//代表一個與伺服器端的連結
struct rpcpu_client
{
uv_tcp_t cli_client;
//請求的存根
rpcpu_call_stub stub[100];
uint8_t status;
};
為了尋找到對應的例程,需要使用者將服務登記,這裡我們僅僅是放到一個數組中:
void rpcpu_reg_service(ProtobufCService* service)
{
services[srv_iter++] = service;
}
接下來開始考慮客戶端的傳送請求,傳送請求需要知道哪些引數呢,包括了服務物件,例程名稱,輸入引數和連結資訊以及用於回撥的回撥函式和一個用於儲存狀態的引數。
void rpcpu_send_request(rpcpu_client* connection,
ProtobufCService* rpc,
char* name,
ProtobufCMessage* message,
request_return req_cb,
void* context)
這裡遇到一個問題,就像剛剛說到的那樣,libuv是不適合做客戶端的,因為在其他執行緒上的請求是不能保證請求的回撥函式落在哪個執行緒上,為了確保在loop執行緒上呼叫回撥函式,需要用變通的辦法來將請求傳送到loop執行緒中,很明顯libuv當初考慮到這個問題而設計了uv_async_t來傳遞引數。
Inter-thread communication
這樣我們就要實現一個傳遞機制來把引數傳到loop執行緒,由loop執行緒傳送請求:
//這個回調發生在loop執行緒中
void rpcpu_send_request_cb(uv_async_t* handle)
{
rpcpu_send_request_para* para = (rpcpu_send_request_para*)handle->data;
if (para == 0)
return;
rpcpu_send_request_internal(para->connection, para->rpc, para->name, para->message, para->req_cb, para->context);
free(para);
}
//這個函式可以在任意執行緒呼叫
void rpcpu_send_request(rpcpu_client* connection,
ProtobufCService* rpc,
char* name,
ProtobufCMessage* message,
request_return req_cb,
void* context)
{
rpcpu_send_request_para* para = (rpcpu_send_request_para*)malloc(sizeof(rpcpu_send_request_para));
para->connection = connection;
para->rpc = rpc;
para->name = name;
para->message = message;
para->req_cb = req_cb;
para->context = context;
send_async.data = para;
uv_async_send(&send_async);
}
//具體的請求傳送函式
void rpcpu_send_request_internal(rpcpu_client* connection,
ProtobufCService* rpc,
char* name,
ProtobufCMessage* message,
request_return req_cb,
void* context)
{...}
請求的組包分為這幾個過程,建立請求頭,建立請求存根,序列化,傳送資料:
void rpcpu_send_request_internal(rpcpu_client* connection,
ProtobufCService* rpc,
char* name,
ProtobufCMessage* message,
request_return req_cb,
void* context)
{
//create req_request object
RpcRequest req = RPC_REQUEST__INIT;
for (int i = 0;i < rpc->descriptor->n_methods;i++)
{
if (!strcmp(rpc->descriptor->methods[i].name, name))
{
req.request = i;
break;
}
}
req.service = (char*)malloc(strlen(rpc->descriptor->name) + 1);
strcpy(req.service, rpc->descriptor->name);
//save the request stub
for (int i = 0;i < 100;i++)
{
if (connection->stub[i].id == 0xFFFFFFFF)
{
InterlockedExchange(&connection->stub[i].id, i);
req.id = i;
connection->stub[i].output = (ProtobufCMessageDescriptor*)rpc->descriptor->methods[req.request].output;
connection->stub[i].context = context;
connection->stub[i].cb = req_cb;
break;
}
}
//get the total send buffer size
size_t req_size = rpc_request__get_packed_size(&req);
size_t msg_size = protobuf_c_message_get_packed_size((const ProtobufCMessage*)(message));
//group the packet
char* send_buf = (char*)malloc(req_size + msg_size + MAGIC_SIZE + 4);
char* iter = send_buf;
//packet format:
//REQUEST_MAGIC|req_size|packed request|msg_size|packed message
memcpy(iter, REQUEST_MAGIC, MAGIC_SIZE);
iter += MAGIC_SIZE;
*(uint16_t*)iter = req_size;
iter += 2;
rpc_request__pack(&req, (uint8_t*)iter);
iter += req_size;
*(uint16_t*)iter = msg_size;
iter += 2;
protobuf_c_message_pack((const ProtobufCMessage*)message, (uint8_t*)iter);
ProtobufCMessage* ss = protobuf_c_message_unpack((const ProtobufCMessageDescriptor*)message, NULL, msg_size, (uint8_t*)iter);
//send it
uv_buf_t uv_buf = uv_buf_init(send_buf, req_size + msg_size + MAGIC_SIZE);
uv_write_t* writer = (uv_write_t*)malloc(sizeof(uv_write_t));
writer->data = send_buf;
uv_write(writer, (uv_stream_t*)&connection->cli_client, &uv_buf, 1, write_cb);
//free the resource
free(req.service);
}
客戶端傳送請求後,伺服器就需要收包並解析了,解析函式是伺服器端與客戶端通用的,解包後呼叫對應的服務例程:
rpcpu_closure_data closure_data;
closure_data.req = request;
closure_data.srv = srv;
service->invoke(service, request->request, rpc_msg, rpcpu_common_closure, &closure_data);
這兒用到了一個protobuf現成的機制,之前想了半天怎麼把使用者服務的返回值傳出來,後來發現protobuf的回撥ProtobufCClosure應該就是用於這個目的吧。也就是說使用者的回撥函式必須呼叫我們的rpcpu_common_closure才能取到返回值。
void rpcpu_common_closure(const ProtobufCMessage *msg, void *closure_data)
{
rpcpu_closure_data* closure = (rpcpu_closure_data*)closure_data;
rpcpu_send_response(closure->srv, closure->req, (ProtobufCMessage*)msg);
}
響應包的組包和請求包基本相同,不再多述。只是使用者的響應回撥需要放到其他執行緒中以免阻塞loop執行緒:
uv_work_t* work = (uv_work_t*)malloc(sizeof(uv_work_t));
work->data = stub;
uv_queue_work(uv_default_loop(), work, rpcpu_worker_return_cb, 0);
為了更像真實的函式呼叫,可以將傳送請求的函式作一個同步版本:
void rpcpu_common_request_return(ProtobufCMessageDescriptor* message, void* context)
{
rpcpu_sync_context* sync_ctx = (rpcpu_sync_context*)context;
sync_ctx->msg = (ProtobufCMessage*)message;
SetEvent((HANDLE)sync_ctx->event);
}
ProtobufCMessage* rpcpu_send_request_sync(rpcpu_client* connection,
ProtobufCService* rpc,
char* name,
ProtobufCMessage* message)
{
rpcpu_send_request_para* para = (rpcpu_send_request_para*)malloc(sizeof(rpcpu_send_request_para));
para->connection = connection;
para->rpc = rpc;
para->name = name;
para->message = message;
para->req_cb = rpcpu_common_request_return;
rpcpu_sync_context sync_ctx;
sync_ctx.event = CreateEvent(NULL, FALSE, FALSE, NULL);
sync_ctx.msg = 0;
para->context = &sync_ctx;
send_async.data = para;
uv_async_send(&send_async);
WaitForSingleObject(para->context, INFINITE);
CloseHandle((HANDLE)sync_ctx.event);
return sync_ctx.msg;
}