1. 程式人生 > >GRPC 非同步呼叫 C++

GRPC 非同步呼叫 C++

原文  http://www.sailsxu.com/?p=613

protobuf原生的非同步呼叫

void DoneCallback(PingMessage *response) {
}
void async_test() {
  RpcClient client("127.0.0.1", 8000);
  PingService::Stub stub(client.Channel());
  if (!client.init()) {
    printf("can't connect\n");
  }
  PingMessage request;
  // get time
  gettimeofday(&aync_starttime, NULL
); request.set_time(aync_starttime.tv_sec*1000+int(aync_starttime.tv_usec/1000)); PingMessage* response = new PingMessage(); Closure* callback = NewCallback(&DoneCallback, response); stub.ping(client.Controller(), &request, response, callback); }

原生的protobuf提供的rpc框架是讓我們提供一個回撥,當有對應的響應時(傳送的訊息序列號與接收的相同),呼叫對應的回撥即可;它生成的介面同步和非同步是相同的;
與同步呼叫有相同的問題;由於它只能一個request和response,沒有context和status,所以不能帶額外的引數;也不能表示呼叫成功與否;

GRPC非同步呼叫

class GreeterClient {
 public:
  explicit GreeterClient(std::shared_ptr<Channel> channel)
      : stub_(Greeter::NewStub(channel)) {}

  std::string SayHello(const std::string& user) {
    HelloRequest request;
    request.set_name(user);

    HelloReply reply;

    // Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors. ClientContext context; // The producer-consumer queue we use to communicate asynchronously with the // gRPC runtime. CompletionQueue cq; // Storage for the status of the RPC upon completion. Status status; // 呼叫stub_->AsyncSayHello非同步介面向伺服器傳送請求 std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc( stub_->AsyncSayHello(&context, request, &cq)); // 設定rpc完成時動作,結果引數,狀態,和tag(tag用來方便應用層能區分不同的請求,雖然grpc內部有每個請求自己的uid,但是對於應用層卻不能用,通過在這裡設定一個tag,可以方便的區別,因為非同步的可能會多個請求通過rpc發出,接收到後都放到了CompletionQueue中,所以應用層要能區分,就要設定一個tag) rpc->Finish(&reply, &status, (void*)1); void* got_tag; bool ok = false; // CompletionQueue的next會阻塞到有一個結果為止,got_tag會設定成前面的tag,而ok會設定成程式碼是否成功的拿到了響應(ok與status不同,status是表示伺服器的結果,而ok只是表示阻塞佇列中Next的結果) GPR_ASSERT(cq.Next(&got_tag, &ok)); // 通過tag區分不同的響應對應的請求 GPR_ASSERT(got_tag == (void*)1); GPR_ASSERT(ok); // 最終返回值 if (status.ok()) { return reply.message(); } else { return "RPC failed"; } } private: std::unique_ptr<Greeter::Stub> stub_; };

有三點要注意的 :
1. 上面展示了非同步呼叫的使用方式,但是實際工作中不會這麼去用,因為它這樣寫還是一個同步的;
2. 這種非同步的方式是基於CompletionQueue做的,與protobuf自帶的rpc基於回撥的方式完全不同,如果是protobuf rpc,那麼非同步回撥會由rpc內部執行緒來做;而grpc在實際工作中,還是得自己有一個執行緒去呼叫CompletionQueue的next來等待訊息。
3. 這個protobuf rpc方式相差很大,但是思想卻是回撥由誰來呼叫引起的,由rpc執行緒來呼叫,那就要傳回調給內部,如果由使用者執行緒來呼叫,就要由一個佇列先儲存結果;

這裡有一個疑問:
為什麼要自己建立一個reply,而不是讓grpc內部自己建立,一方面增加了一個引數,另一方面也增加了管理的複雜性,比如有多次呼叫非同步介面,那麼為了有在另一個執行緒中使用,要把這個reply物件儲存起來,再根據tag去讀取不同的物件,這樣比較複雜,如果是grpc內部建立,那麼就麼可以把結果儲存到CompletionQueue中,通過next來得到結果。如果只是為了保持誰建立誰銷燬的原則,如果是內部建立,可以通過返回一個std::unique_ptr來儲存可以自動銷燬。

所以總的來說,非同步呼叫介面對於應用層來說還不是不太方便,可以像protobuf rpc一樣介面一個回撥函式(引數就是response, status和tag);這樣更方便一些;

GRPC伺服器非同步

這裡的非同步伺服器是指這個情況:當一個任務需要時間比較長時,不能一直佔用工作執行緒,這時需要啟動另一個執行緒去做,完成之後通知回server的工作執行緒處理;
在這種情況其實和客戶端的需要用非同步的情況一樣,都是不能阻塞執行緒,讓另一個執行緒處理。
如果是自己來實現這樣的邏輯,會怎麼處理呢?
收到非同步呼叫請求-》首先是儲存上下文(比如呼叫引數,client的writer),加入一個佇列中-》在新的執行緒從佇列中取出任務處理-》呼叫writer向client寫資料。

grpc的基本流程:

    // 註冊一個服務,並啟動
    helloworld::Greeter::AsyncService service;
    ServerBuilder builder;
    builder.AddListeningPort("0.0.0.0:50051", InsecureServerCredentials());
    builder.RegisterAsyncService(&service);
    auto cq = builder.AddCompletionQueue();
    auto server = builder.BuildAndStart();
    // 建立一個任務,去等待請求(非同步,所以這裡會馬上返回), 當收到一個sayhello的呼叫之後,context,request,responder進行初始化,然後加入佇列中(根據tag來設定)
    ServerContext context;
    HelloRequest request;
    ServerAsyncResponseWriter<HelloReply> responder;
    service.RequestSayHello(&context, &request, &responder, &cq, &cq, (void*)1);
    // 等待佇列結果,如果tag是上面設定的值,說明這個結果就是得到了請求呼叫,然後就可以開始處理了
    HelloReply reply;
    Status status;
    void* got_tag;
    bool ok = false;
    cq.Next(&got_tag, &ok);
    if (ok && got_tag == (void*)1) {
      // set reply and status
      // 建立完成之後,呼叫responder.Finish傳送結果,如果傳送成功,也會以tag為標識加入佇列中
      responder.Finish(reply, status, (void*)2);
    }
    // 處理完成之後,刪除自己
    void* got_tag;
    bool ok = false;
    cq.Next(&got_tag, &ok);
    if (ok && got_tag == (void*)2) {
      // clean up
    }

從上面的流程和一般的想法有點不一樣:
它是先呼叫RequestSayHello開啟處理流程(相當於註冊了一個處理器)-》然後非同步請求呼叫到達-》根據RequestSayHello引數加入一個佇列-》從佇列中取出資料處理-》呼叫responder.Finish傳送-》傳送狀態又會入佇列-》清理這次的處理流程。
和同步呼叫比起來,沒有去註冊service,而就直接註冊對應的方法;並且由於是非同步處理,所以伺服器不主動呼叫,所以它會把收到的請求等資訊放入佇列,我們自己要去遍歷;

上面的是基本的流程,下面是一個完整的例子,通過CallData物件,封裝了一個非同步請求的所以動作(這個物件封裝得很好,一種非同步請求一個型別,外界不用關注它的內部實現);

grpc的伺服器非同步呼叫完整程式碼:

class ServerImpl final {
 public:
  ~ServerImpl() {
    server_->Shutdown();
    // Always shutdown the completion queue after the server.
    cq_->Shutdown();
  }

  // 開始註冊service,並執行
  void Run() {
    std::string server_address("0.0.0.0:50051");

    ServerBuilder builder;
    builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
    builder.RegisterService(&service_);
    cq_ = builder.AddCompletionQueue();
    server_ = builder.BuildAndStart();
    std::cout << "Server listening on " << server_address << std::endl;
    // 開始處理執行緒
    HandleRpcs();
  }

 private:
  // 這個類封裝了非同步請求處理所需要的狀態與邏輯
  // Class encompasing the state and logic needed to serve a request.
  class CallData {
   public:
    // 建立一個calldata物件,然後開始呼叫呼叫對應的函式;這個物件封裝了請求,ctx(用於傳送訊息給軍客戶端)
    CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
        : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
      Proceed();
    }

    void Proceed() {
      if (status_ == CREATE) {
        // 開始註冊SayHello非同步處理流程,當收到一個sayhello的請求呼叫後,會加入佇列
        status_ = PROCESS;
        service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
                                  this);
      } else if (status_ == PROCESS) {
        // 這裡又新建立了處理流程,不然可能在處理過程中有請求來就不能及時處理
        new CallData(service_, cq_);
        // 從佇列中拿到請求,真正的處理邏輯,這個會在一個新執行緒中執行
        // The actual processing.
        std::string prefix("Hello ");
        reply_.set_message(prefix + request_.name());
        // 設定狀態
        status_ = FINISH;
        responder_.Finish(reply_, Status::OK, this);
      } else {
        // 傳送完成之後入佇列中的資料
        GPR_ASSERT(status_ == FINISH);
        // Once in the FINISH state, deallocate ourselves (CallData).
        delete this;
      }
    }

   private:
    // 非同步service
    Greeter::AsyncService* service_;
    // 一個生產者消費者佇列
    ServerCompletionQueue* cq_;
    ServerContext ctx_;
    HelloRequest request_;
    HelloReply reply_;
    // 用於訊息回覆的方法
    ServerAsyncResponseWriter<HelloReply> responder_;

    // Let's implement a tiny state machine with the following states.
    enum CallStatus { CREATE, PROCESS, FINISH };
    CallStatus status_;  // The current serving state.
  };

  // This can be run in multiple threads if needed.
  void HandleRpcs() {
    // 建立一個新的,由於新建立的status為create,所以它馬上會開始service的RequestSayHello方法
    new CallData(&service_, cq_.get());
    void* tag;  // uniquely identifies a request.
    bool ok;
    while (true) {
      // Block waiting to read the next event from the completion queue. The
      // event is uniquely identified by its tag, which in this case is the
      // memory address of a CallData instance.
      // The return value of Next should always be checked. This return value
      // tells us whether there is any kind of event or cq_ is shutting down.
      // 從
      GPR_ASSERT(cq_->Next(&tag, &ok));
      GPR_ASSERT(ok);
      static_cast<CallData*>(tag)->Proceed();
    }
  }

  std::unique_ptr<ServerCompletionQueue> cq_;
  Greeter::AsyncService service_;
  std::unique_ptr<Server> server_;
};

int main(int argc, char** argv) {
  ServerImpl server;
  server.Run();

  return 0;
}