1. 程式人生 > >使用google protobuf RPC實現echo service

使用google protobuf RPC實現echo service

這篇文章將講述如何使用google的protobuf庫實現一個RPC service,就實現一個最簡單的service吧:echo.
文章對應的程式碼都可以在eventrpc中找到,寫下這篇文章時的svn revision是138.

1) 定義協議
首先需要為這個service定義proto檔案, 如下:

package echo;

message EchoRequest
{
  required string message = 1;
};

message EchoResponse
{
  required string response = 1;
};

service EchoService
{
  rpc Echo(EchoRequest) returns (EchoResponse);
};

解釋一下這個proto檔案中做的事情,它定義了一個package: echo, 這個package中有service:EchoService,而這個service下只有一個服務:Echo, 它的請求由EchoRequest結構體定義,回覆由EchoResponse定義.
package相當於是C++中namespace的概念,有些package中可能會提供相同名字的service,為了解決命名衝突,就引入了package這個概念.

2) 對應的C++檔案
使用protobuf自帶的編譯proto檔案編譯器,可以生成對應的pb.h和pb.cc檔案.具體細節可以參考protobuf關於這部分的

參考文件.

所生成的C++檔案,都會在namespace echo中,就是前面提到的package概念.對於service EchoService而言,會對應的生成兩個類:EchoService類和EchoService_Stub類:

class EchoService : public ::google::protobuf::Service {
  // ....
  EchoService_Stub(::google::protobuf::RpcChannel* channel);
  virtual void Echo(::google::protobuf::RpcController* controller,
                       const ::echo::EchoRequest* request,
                       ::echo::EchoResponse* response,
                       ::google::protobuf::Closure* done);
  void CallMethod(const ::google::protobuf::MethodDescriptor* method,
                  ::google::protobuf::RpcController* controller,
                  const ::google::protobuf::Message* request,
                  ::google::protobuf::Message* response,
                  ::google::protobuf::Closure* done);
};

class EchoService_Stub : public EchoService {
//...
  void Echo(::google::protobuf::RpcController* controller,
                       const ::echo::EchoRequest* request,
                       ::echo::EchoResponse* response,
                       ::google::protobuf::Closure* done);
};

上面省略了一些細節,只把最關鍵的部分提取出來了.
這兩部分如何使用,後面會繼續講解

3) 實現客戶端
首先來看如何實現客戶端.
客戶端都通過上面提到的對應service的stub類來發送請求,以sample/echo_client.cpp中的程式碼來解釋:

Dispatcher dispatcher;
RpcChannel *channel = new RpcChannel("127.0.0.1", 21118, &dispatcher);
if (!channel->Connect()) {
  printf("connect to server failed, abort\n");
  exit(-1);
}
echo::EchoService::Stub stub(channel);
echo::EchoRequest request;
echo::EchoResponse response;
request.set_message("hello");
stub.Echo(NULL, &request, &response,
          gpb::NewCallback(::echo_done, &response, channel));

可以看到,stub類的建構函式需要一個::google::protobuf::RpcChannel指標,這個類需要我們來實現,後面繼續說.然後就是根據協議填充請求欄位,註冊回撥函式,這之後就可以呼叫stub類提供的Echo函式傳送請求了.

4) 實現RpcChannel類
現在可以講解RpcChannel類和stub類的關係了,看看在呼叫stub::Echo函式,也就是傳送請求時發生了什麼事情:

void EchoService_Stub::Echo(::google::protobuf::RpcController* controller,
                              const ::echo::EchoRequest* request,
                              ::echo::EchoResponse* response,
                              ::google::protobuf::Closure* done) {
  channel_->CallMethod(descriptor()->method(0),
                       controller, request, response, done);
}

可以看到,傳送請求的背後,最後呼叫的其實是RpcChannel的CallMethod函式.所以,要實現RpcChannel類,最關鍵的就是要實現這個函式,在這個函式中完成傳送請求的事務.具體可以看rpc_channel.cpp中的做法,不再闡述,因為這裡面做的事情,和一般的網路客戶端做的事情差不多.

5) 如何識別service
前面提到過,每個service的請求包和回覆包都是protobuf中的message結構體,在這個例子中是EchoRequest和EchoResponse message.可是,它們僅僅是包體,也就是說,即使你傳送了這些訊息,在伺服器端還需要一個包頭來識別到底是哪個請求的包體.
於是在程式碼中,引入了一個類Meta,其中有兩個關鍵的變數:包體長度和method id.
包體長度自不必說,就是緊跟著包頭的包體資料的長度.
method id是用來標識哪一個service的,如果不用id數字,也可以使用字串,每個service,都有一個full name的概念,以這裡的例子而言,Echo服務的full name是echo::EchoService::Echo(再次的,又是C++中namespace的概念來表示”全路徑”以避免命名衝突).但是,如果使用full name來區分,一來發送包頭就會過大,而來查詢service時是一個字串比較操作的過程,耗時間.
所以引入了method id的概念,選擇hash full name為一個id值,一般而言,一個伺服器對外提供的service,撐死有幾百個吧,而選用的id是整型資料,另外再選擇足夠好的hash演算法,絕大多數情況下是不會出現衝突的.
以上就是Meta類做的事情,封裝了包體和識別service的method id,一併作為包頭和包體拼接傳送給伺服器端.

5) 實現伺服器端
接收到客戶端的請求之後,首先要做一些安全性的檢查,比如method id對應的service是否有註冊.
其次就是真正的處理過程了:

int  RpcMethodManager::HandleService(string *message,
                                     Meta *meta, Callback *callback) {
  RpcMethod *rpc_method = rpc_methods_[meta->method_id()];
  const gpb::MethodDescriptor *method = rpc_method->method_;
  gpb::Message *request = rpc_method->request_->New();
  gpb::Message *response = rpc_method->response_->New();
  request->ParseFromString(*message);
  HandleServiceEntry *entry = new HandleServiceEntry(method,
                                                     request,
                                                     response,
                                                     message,
                                                     meta,
                                                     callback);
  gpb::Closure *done = gpb::NewCallback(
      &HandleServiceDone, entry);
  rpc_method->service_->CallMethod(method,
                                   NULL,
                                   request, response, done);
  return 0;
}

上面註冊了一個名為HandleServiceDone的回撥函式,當service的Echo處理完畢之後,自動就會呼叫這個回撥函式
來看 EchoService::CallMethod的定義

void EchoService::CallMethod(const ::google::protobuf::MethodDescriptor* method,
                             ::google::protobuf::RpcController* controller,
                             const ::google::protobuf::Message* request,
                             ::google::protobuf::Message* response,
                             ::google::protobuf::Closure* done) {
  GOOGLE_DCHECK_EQ(method->service(), EchoService_descriptor_);
  switch(method->index()) {
    case 0:
      Echo(controller,
             ::google::protobuf::down_cast<CONST ::echo::EchoRequest*>(request),
             ::google::protobuf::down_cast< ::echo::EchoResponse*>(response),
             done);
      break;
    default:
      GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
      break;
  }
}

可以看到, 這個Echo服務是需要註冊的伺服器端首先實現的,以echo_server.cpp中的程式碼為例,它是這樣做的:

class EchoServiceImpl : public echo::EchoService {
 public:
  EchoServiceImpl() {
  };                                                                       

  virtual void Echo(::google::protobuf::RpcController* controller,
                       const ::echo::EchoRequest* request,
                       ::echo::EchoResponse* response,
                       ::google::protobuf::Closure* done) {
    printf ("request: %s\n", request->message().c_str());
    response->set_response(request->message());
    if (done) {
      done->Run();
    }
  }
};

它做的事情就是把收到的請求打印出來,然後將請求訊息作為回覆訊息傳送回去.呼叫done->Run()函式,其實就是呼叫前面註冊的回撥函式HandleServiceDone函式,這時候表示伺服器端已經準備好了給客戶端響應的訊息,後面就是網路傳輸層的事情了.

以上是使用google protobuf RPC實現一個service的全過程.protobuf官方並沒有給出這樣一個demo的例子,所以我在eventrpc專案中試圖封裝protobuf來做RPC service.
但是,當前的實現還不夠完善,存在以下的問題:
1) 效率不高
2) 沒有實現客戶端可以選擇非同步或者同步方式來響應伺服器端的訊息
3) 安全性檢查不夠完善,目前僅適用method id來檢查
4) 沒有把dispatcher抽出來獨立到一個執行緒中,只有這樣才能實現2)
5) 沒有為每個函式寫測試用例.
....
N) 其他還沒有想到的....等著您給建議

不過,就以上而言,如果想了解如何使用protobuf來實現RPC,已經足夠說明原理了,可以對應著程式碼和官方文件看看每個類的含義.
要編譯成功,需要protobuf庫和phread庫.之前曾經使用libevent,但是不喜歡這個東東,於是就自己做了,但是目前僅支援epoll而已,所以還只能在linux上面編譯.