一個基於protobuf的極簡RPC
前言
RPC採用客戶機/伺服器模式實現兩個程序之間的相互通訊,socket是RPC經常採用的通訊手段之一。當然,除了socket,RPC還有其他的通訊方法:http、管道。。。網路開源的RPC框架也比較多,一個功能比較完善的RPC框架程式碼比較多,如何快速的從這些程式碼盲海中梳理清楚主要脈絡,對於初學者來說比較困難,本文介紹之前自己實現的一個C++極簡版的RPC框架(https://github.com/goyas/goya-rpc),程式碼只有100多行,希望儘量用少的程式碼來描述框架以減輕初學者的學習負擔,同時便於大家閱讀網路上覆雜的RPC原始碼。
1、經典的RPC框架echo例子裡面,EchoServer_Stub類是哪裡來的?
2、為什麼stub.Echo(&controller, &request, &response, nullptr); 呼叫就執行到server端的Echo函式?
3、stub.Echo(&controller, &request, &response, nullptr); 最後一個引數是nullptr,呼叫到server端的Echo(controller, request, response, done) 函式時,done指標為什麼不為空了?
…
讓我們通過下面這個簡單的RPC框架,一層一層解開上面的疑惑。
echo_server.cc
class EchoServerImpl : public goya::rpc::echo::EchoServer { public: EchoServerImpl() {} virtual ~EchoServerImpl() {} private: virtual void Echo(google::protobuf::RpcController* controller, const goya::rpc::echo::EchoRequest* request, goya::rpc::echo::EchoResponse* response, google::protobuf::Closure* done) { std::cout << "server received client msg: " << request->message() << std::endl; response->set_message( "server say: received msg: ***" + request->message() + std::string("***")); done->Run(); } }; int main(int argc, char* argv[]) { RpcServer rpc_server; goya::rpc::echo::EchoServer* echo_service = new EchoServerImpl(); if (!rpc_server.RegisterService(echo_service, false)) { std::cout << "register service failed" << std::endl; return -1; } std::string server_addr("0.0.0.0:12321"); if (!rpc_server.Start(server_addr)) { std::cout << "start server failed" << std::endl; return -1; } return 0; }
echo_client.cc
int main(int argc, char* argv[]) { echo::EchoRequest request; echo::EchoResponse response; request.set_message("hello tonull, from client"); char* ip = argv[1]; char* port = argv[2]; std::string addr = std::string(ip) + ":" + std::string(port); RpcChannel rpc_channel(addr); echo::EchoServer_Stub stub(&rpc_channel); RpcController controller; stub.Echo(&controller, &request, &response, nullptr); if (controller.Failed()) std::cout << "request failed: %s" << controller.ErrorText().c_str(); else std::cout << "resp: " << response.message() << std::endl; return 0; }
上面是一個簡單的Echo例項的程式碼,主要功能是:server端收到client傳送來的訊息,然後echo返回給client,功能非常簡單,但是走完了整個流程。其他特性無非基於此的一些衍生。好了,我們現在來解析下這個原始碼,首先來看server端。
RpcServer rpc_server; goya::rpc::echo::EchoServer* echo_service = new EchoServerImpl(); rpc_server.RegisterService(echo_service, false) rpc_server.Start(server_addr)
最主要就上面四行程式碼,定義了兩個物件rpc_server和echo_service,然後註冊物件,啟動服務。EchoServerImpl繼承於EchoServer,講到這裡也許有人會問,我沒有定義EchoServer這個類啊,它是從哪裡來的?ok,那我們這裡先跳到講解下protobuf,講完之後再回過頭來繼續。
protobuf
通過socket,client和server可以互相互動訊息,但這種通訊效率不高,一般選擇在傳送的時候把訊息經過序列化,而在接受的時候採用反序列化解析就可以了,本文采用谷歌開源的protobuf作為訊息序列化的方法,其他序列化的方法還有json和rlp。。。
首先按照proto格式,定義訊息傳輸的內容, EchoRequest為請求訊息,EchoRequest為響應訊息,在EchoServer裡面定義了Echo方法。
syntax = "proto3"; package goya.rpc.echo; option cc_generic_services = true; message EchoRequest { string message = 1; } message EchoResponse { string message = 1; } service EchoServer { rpc Echo(EchoRequest) returns(EchoResponse); }
把定義的proto檔案用protoc工具生成對應的echo_service.pb.h和 echo_service.pb.cc檔案,網上有很多介紹怎麼使用proto檔案生成對應的pb.h和pb.c的文件,這裡就不在過多描述。具體的也可以看工程裡面的 sample/echo/CMakeLists.txt 檔案。
service EchoService這一句會生成EchoService和EchoService_Stub兩個類,分別是 server 端和 client 端需要關心的。
回到server
對 server 端,通過EchoService::Echo來處理請求,程式碼未實現,需要子類來 override。
void EchoService::Echo(::google::protobuf::RpcController* controller, const ::echo::EchoRequest*, ::echo::EchoResponse*, ::google::protobuf::Closure* done) { // 程式碼未實現,需要server返回給client什麼內容,就在這裡填寫 controller->SetFailed("Method Echo() not implemented."); done->Run(); }
好了,我們現在回到上面沒有講完的server,server定義了EchoServerImpl物件,實現了Echo方法,功能也就是把client傳送來的訊息又返回給client。 server裡面還沒講解完的是“註冊”和“啟動”服務兩個功能,我們直接跳到程式碼講解。
RegisterService註冊的功能非常簡單,就是把我們自己定義的EchoServerImpl物件echo_service給儲存在services_這個資料結構裡。
bool RpcServerImpl::RegisterService(google::protobuf::Service* service, bool ownership) { services_[0] = service; return true; }
Start啟動服務的功能也很簡單,就是一個socket不斷的accept遠端傳送過來的資料,然後進行處理。
bool RpcServerImpl::Start(std::string& server_addr) { ... while (true) { auto socket = boost::make_shared<boost::asio::ip::tcp::socket>(io); acceptor.accept(*socket); std::cout << "recv from client: " << socket->remote_endpoint().address() << std::endl; int request_data_len = 256; std::vector<char> contents(request_data_len, 0); socket->receive(boost::asio::buffer(contents)); ProcRpcData(std::string(&contents[0], contents.size()), socket); } }
回到client
RpcChannel rpc_channel(addr); echo::EchoServer_Stub stub(&rpc_channel); RpcController controller; stub.Echo(&controller, &request, &response, nullptr);
對於client 端,最主要就上面四條語句,定義了RpcChannel、EchoServer_Stub、RpcController三個不同的物件,通過EchoService_Stub來發送資料,EchoService_Stub::Echo呼叫了::google::protobuf::Channel::CallMethod方法,但是Channel是一個純虛類,需要 RPC 框架在子類裡實現需要的功能。
class EchoService_Stub : public EchoService { ... void Echo(::google::protobuf::RpcController* controller, const ::echo::EchoRequest* request, ::echo::EchoResponse* response, ::google::protobuf::Closure* done); private: ::google::protobuf::RpcChannel* channel_; }; void EchoService_Stub::Echo(::google::protobuf::RpcController* controller, const ::echo::EchoRequest* request, ::echo::EchoResponse* response, ::google::protobuf::Closure* done) { channel_->CallMethod(descriptor()->method(0), controller, request, response, done); }
也就是說,執行stub.Echo(&controller, &request, &response, nullptr); 這條語句實際是執行到了
void RpcChannelImpl::CallMethod(const ::google::protobuf::MethodDescriptor* method, ::google::protobuf::RpcController* controller, const ::google::protobuf::Message* request, ::google::protobuf::Message* response, ::google::protobuf::Closure* done) { std::string request_data = request->SerializeAsString(); socket_->send(boost::asio::buffer(request_data)); int resp_data_len = 256; std::vector<char> resp_data(resp_data_len, 0); socket_->receive(boost::asio::buffer(resp_data)); response->ParseFromString(std::string(&resp_data[0], resp_data.size())); }
RpcChannelImpl::CallMethod主要做了什麼呢?主要兩件事情:1、把request訊息通過socket傳送給遠端;2、同時接受來自遠端的reponse訊息。
講到這裡基本流程就梳理的差不多了,文章開頭的幾個問題也基本在講解的過程中回答了,對於後面兩個問題,這裡再劃重點講解下,stub.Echo(&controller, &request, &response, nullptr); 最後一個引數是nullptr,這裡你填啥都沒啥卵用,因為在RpcChannelImpl::CallMethod中根本就沒使用到,而為什麼又要加這個引數呢?這純屬是為了給人一種錯覺:client端執行stub.Echo(&controller, &request, &response, nullptr);就是呼叫到了server端的EchoServerImpl::Echo(*controller, *request, *response, *done),使遠端呼叫看起來像本地呼叫一樣(至少引數型別及個數是一致的)。而其實這也是最令初學者疑惑的地方。
而本質上,server端的EchoServerImpl::Echo(*controller, *request, *response, *done)函式其實是在接受到資料後,從這裡呼叫過來的,具體見下面程式碼:
void RpcServerImpl::ProcRpcData(const std::string& serialzied_data, const boost::shared_ptr<boost::asio::ip::tcp::socket>& socket) { auto service = services_[0]; auto m_descriptor = service->GetDescriptor()->method(0); auto recv_msg = service->GetRequestPrototype(m_descriptor).New(); auto resp_msg = service->GetResponsePrototype(m_descriptor).New(); recv_msg->ParseFromString(serialzied_data); // 構建NewCallback物件 auto done = google::protobuf::NewCallback( this, &RpcServerImpl::OnCallbackDone, resp_msg, socket); RpcController controller; service->CallMethod(m_descriptor, &controller, recv_msg, resp_msg, done); }
service->CallMethod(m_descriptor, &controller, recv_msg, resp_msg, done); 會呼叫到EchoServer::CallMethod,protobuf會根據method->index()找到對應的執行函式,EchoServerImpl實現了Echo函式,所以上面的service->CallMethod(m_descriptor, &controller, recv_msg, resp_msg, done); 會執行到EchoServerImpl::Echo,這進一步說明了 EchoServerImpl::Echo 跟stub.Echo()呼叫沒有雞毛關係,唯一有的關係,確實發起動作是stub.Echo(); 中間經過了無數次解析最後確實是調到了EchoServerImpl::Echo。
void EchoServer::CallMethod(const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method, ::PROTOBUF_NAMESPACE_ID::RpcController* controller, const ::PROTOBUF_NAMESPACE_ID::Message* request, ::PROTOBUF_NAMESPACE_ID::Message* response, ::google::protobuf::Closure* done) { GOOGLE_DCHECK_EQ(method->service(), file_level_service_descriptors_echo_5fservice_2eproto[0]); switch(method->index()) { case 0: Echo(controller, ::PROTOBUF_NAMESPACE_ID::internal::DownCast<const ::goya::rpc::echo::EchoRequest*>( request), ::PROTOBUF_NAMESPACE_ID::internal::DownCast<::goya::rpc::echo::EchoResponse*>( response), done); break; default: GOOGLE_LOG(FATAL) << "Bad method index; this should never happen."; break; } }
&n