1. 程式人生 > >一個基於protobuf的極簡RPC

一個基於protobuf的極簡RPC

前言

RPC採用客戶機/伺服器模式實現兩個程序之間的相互通訊,socket是RPC經常採用的通訊手段之一。當然,除了socket,RPC還有其他的通訊方法:http、管道。。。網路開源的RPC框架也比較多,一個功能比較完善的RPC框架程式碼比較多,如何快速的從這些程式碼盲海中梳理清楚主要脈絡,對於初學者來說比較困難,本文介紹之前自己實現的一個C++極簡版的RPC框架(https://github.com/goyas/goya-rpc),程式碼只有100多行,希望儘量用少的程式碼來描述框架以減輕初學者的學習負擔,同時便於大家閱讀網路上覆雜的RPC原始碼。

1、經典的RPC框架echo例子裡面,EchoServer_Stub類是哪裡來的?
2、為什麼stub.Echo(&controller, &request, &response, nullptr); 呼叫就執行到server端的Echo函式?
3、stub.Echo(&controller, &request, &response, nullptr); 最後一個引數是nullptr,呼叫到server端的Echo(controller, request, response, done) 函式時,done指標為什麼不為空了?

讓我們通過下面這個簡單的RPC框架,一層一層解開上面的疑惑。

echo_server.cc

class EchoServerImpl : public goya::rpc::echo::EchoServer {
public:
  EchoServerImpl() {}
  virtual ~EchoServerImpl() {}

private:
  virtual void Echo(google::protobuf::RpcController* controller,
                    const goya::rpc::echo::EchoRequest* request,
                    goya::rpc::echo::EchoResponse* response,
                    google::protobuf::Closure* done) 
  {
    std::cout << "server received client msg: " << request->message() << std::endl;
    response->set_message(
      "server say: received msg: ***" + request->message() + std::string("***"));
    done->Run();
  }
};

int main(int argc, char* argv[]) 
{
  RpcServer rpc_server;

  goya::rpc::echo::EchoServer* echo_service = new EchoServerImpl();
  if (!rpc_server.RegisterService(echo_service, false)) {
    std::cout << "register service failed" << std::endl;
    return -1;
  }

  std::string server_addr("0.0.0.0:12321");
  if (!rpc_server.Start(server_addr)) {
    std::cout << "start server failed" << std::endl;
    return -1;
  }

  return 0;
}

 

echo_client.cc

int main(int argc, char* argv[]) 
{ 
  echo::EchoRequest   request;
  echo::EchoResponse  response;
  request.set_message("hello tonull, from client");

  char* ip          = argv[1];
  char* port        = argv[2];
  std::string addr  = std::string(ip) + ":" + std::string(port);
  RpcChannel    rpc_channel(addr);
  echo::EchoServer_Stub stub(&rpc_channel);

  RpcController controller;
  stub.Echo(&controller, &request, &response, nullptr);
  
  if (controller.Failed()) 
    std::cout << "request failed: %s" << controller.ErrorText().c_str();
  else
    std::cout << "resp: " << response.message() << std::endl;

  return 0;
}

 

上面是一個簡單的Echo例項的程式碼,主要功能是:server端收到client傳送來的訊息,然後echo返回給client,功能非常簡單,但是走完了整個流程。其他特性無非基於此的一些衍生。好了,我們現在來解析下這個原始碼,首先來看server端。

RpcServer rpc_server;
goya::rpc::echo::EchoServer* echo_service = new EchoServerImpl();
rpc_server.RegisterService(echo_service, false)
rpc_server.Start(server_addr)

 

最主要就上面四行程式碼,定義了兩個物件rpc_server和echo_service,然後註冊物件,啟動服務。EchoServerImpl繼承於EchoServer,講到這裡也許有人會問,我沒有定義EchoServer這個類啊,它是從哪裡來的?ok,那我們這裡先跳到講解下protobuf,講完之後再回過頭來繼續。

protobuf

通過socket,client和server可以互相互動訊息,但這種通訊效率不高,一般選擇在傳送的時候把訊息經過序列化,而在接受的時候採用反序列化解析就可以了,本文采用谷歌開源的protobuf作為訊息序列化的方法,其他序列化的方法還有json和rlp。。。

首先按照proto格式,定義訊息傳輸的內容, EchoRequest為請求訊息,EchoRequest為響應訊息,在EchoServer裡面定義了Echo方法。

syntax = "proto3";
package goya.rpc.echo;
option cc_generic_services = true;

message EchoRequest {
  string message = 1;
}
message EchoResponse {
  string message = 1;
}
service EchoServer {
  rpc Echo(EchoRequest) returns(EchoResponse);
}

 

把定義的proto檔案用protoc工具生成對應的echo_service.pb.h和 echo_service.pb.cc檔案,網上有很多介紹怎麼使用proto檔案生成對應的pb.h和pb.c的文件,這裡就不在過多描述。具體的也可以看工程裡面的 sample/echo/CMakeLists.txt 檔案。

service EchoService這一句會生成EchoService和EchoService_Stub兩個類,分別是 server 端和 client 端需要關心的。

回到server

對 server 端,通過EchoService::Echo來處理請求,程式碼未實現,需要子類來 override。

void EchoService::Echo(::google::protobuf::RpcController* controller,
                         const ::echo::EchoRequest*,
                         ::echo::EchoResponse*,
                         ::google::protobuf::Closure* done) {
  // 程式碼未實現,需要server返回給client什麼內容,就在這裡填寫
  controller->SetFailed("Method Echo() not implemented.");
  done->Run();
}

 

好了,我們現在回到上面沒有講完的server,server定義了EchoServerImpl物件,實現了Echo方法,功能也就是把client傳送來的訊息又返回給client。 server裡面還沒講解完的是“註冊”和“啟動”服務兩個功能,我們直接跳到程式碼講解。

RegisterService註冊的功能非常簡單,就是把我們自己定義的EchoServerImpl物件echo_service給儲存在services_這個資料結構裡。

bool RpcServerImpl::RegisterService(google::protobuf::Service* service, bool ownership) {
  services_[0] = service;
  return true;
}

 

Start啟動服務的功能也很簡單,就是一個socket不斷的accept遠端傳送過來的資料,然後進行處理。

bool RpcServerImpl::Start(std::string& server_addr) {
  ...
  while (true) {
    auto socket = boost::make_shared<boost::asio::ip::tcp::socket>(io);
    acceptor.accept(*socket);

    std::cout << "recv from client: " << socket->remote_endpoint().address() << std::endl;

    int request_data_len = 256;
    std::vector<char> contents(request_data_len, 0);
    socket->receive(boost::asio::buffer(contents));

    ProcRpcData(std::string(&contents[0], contents.size()), socket);
  }
}

 

回到client

RpcChannel    rpc_channel(addr);
echo::EchoServer_Stub stub(&rpc_channel);
RpcController controller;
stub.Echo(&controller, &request, &response, nullptr);

 

對於client 端,最主要就上面四條語句,定義了RpcChannel、EchoServer_Stub、RpcController三個不同的物件,通過EchoService_Stub來發送資料,EchoService_Stub::Echo呼叫了::google::protobuf::Channel::CallMethod方法,但是Channel是一個純虛類,需要 RPC 框架在子類裡實現需要的功能。

class EchoService_Stub : public EchoService {
  ...
  void Echo(::google::protobuf::RpcController* controller,
                       const ::echo::EchoRequest* request,
                       ::echo::EchoResponse* response,
                       ::google::protobuf::Closure* done);
 private:
    ::google::protobuf::RpcChannel* channel_;
};

void EchoService_Stub::Echo(::google::protobuf::RpcController* controller,
                              const ::echo::EchoRequest* request,
                              ::echo::EchoResponse* response,
                              ::google::protobuf::Closure* done) {
  channel_->CallMethod(descriptor()->method(0), controller, request, response, done);
}

 

也就是說,執行stub.Echo(&controller, &request, &response, nullptr); 這條語句實際是執行到了

void RpcChannelImpl::CallMethod(const ::google::protobuf::MethodDescriptor* method, 
  ::google::protobuf::RpcController* controller,
  const ::google::protobuf::Message* request,
  ::google::protobuf::Message* response,
  ::google::protobuf::Closure* done) {
  std::string request_data = request->SerializeAsString();
  socket_->send(boost::asio::buffer(request_data));

  int  resp_data_len = 256;
  std::vector<char> resp_data(resp_data_len, 0);
  socket_->receive(boost::asio::buffer(resp_data));

  response->ParseFromString(std::string(&resp_data[0], resp_data.size()));
}

 

RpcChannelImpl::CallMethod主要做了什麼呢?主要兩件事情:1、把request訊息通過socket傳送給遠端;2、同時接受來自遠端的reponse訊息。

講到這裡基本流程就梳理的差不多了,文章開頭的幾個問題也基本在講解的過程中回答了,對於後面兩個問題,這裡再劃重點講解下,stub.Echo(&controller, &request, &response, nullptr); 最後一個引數是nullptr,這裡你填啥都沒啥卵用,因為在RpcChannelImpl::CallMethod中根本就沒使用到,而為什麼又要加這個引數呢?這純屬是為了給人一種錯覺:client端執行stub.Echo(&controller, &request, &response, nullptr);就是呼叫到了server端的EchoServerImpl::Echo(*controller, *request, *response, *done),使遠端呼叫看起來像本地呼叫一樣(至少引數型別及個數是一致的)。而其實這也是最令初學者疑惑的地方。

而本質上,server端的EchoServerImpl::Echo(*controller, *request, *response, *done)函式其實是在接受到資料後,從這裡呼叫過來的,具體見下面程式碼:

void RpcServerImpl::ProcRpcData(const std::string& serialzied_data,
  const boost::shared_ptr<boost::asio::ip::tcp::socket>& socket) {
  auto service      = services_[0];
  auto m_descriptor = service->GetDescriptor()->method(0);
  auto recv_msg = service->GetRequestPrototype(m_descriptor).New();
  auto resp_msg = service->GetResponsePrototype(m_descriptor).New();
  recv_msg->ParseFromString(serialzied_data);
  
  // 構建NewCallback物件
  auto done = google::protobuf::NewCallback(
    this, &RpcServerImpl::OnCallbackDone, resp_msg, socket);
  RpcController controller;
  service->CallMethod(m_descriptor, &controller, recv_msg, resp_msg, done);
}

 

service->CallMethod(m_descriptor, &controller, recv_msg, resp_msg, done); 會呼叫到EchoServer::CallMethod,protobuf會根據method->index()找到對應的執行函式,EchoServerImpl實現了Echo函式,所以上面的service->CallMethod(m_descriptor, &controller, recv_msg, resp_msg, done); 會執行到EchoServerImpl::Echo,這進一步說明了 EchoServerImpl::Echo 跟stub.Echo()呼叫沒有雞毛關係,唯一有的關係,確實發起動作是stub.Echo(); 中間經過了無數次解析最後確實是調到了EchoServerImpl::Echo。

void EchoServer::CallMethod(const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method,
                             ::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                             const ::PROTOBUF_NAMESPACE_ID::Message* request,
                             ::PROTOBUF_NAMESPACE_ID::Message* response,
                             ::google::protobuf::Closure* done) {
  GOOGLE_DCHECK_EQ(method->service(), file_level_service_descriptors_echo_5fservice_2eproto[0]);
  switch(method->index()) {
    case 0:
      Echo(controller,
             ::PROTOBUF_NAMESPACE_ID::internal::DownCast<const ::goya::rpc::echo::EchoRequest*>(
                 request),
             ::PROTOBUF_NAMESPACE_ID::internal::DownCast<::goya::rpc::echo::EchoResponse*>(
                 response),
             done);
      break;
    default:
      GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
      break;
  }
}

&n