GRPC 非同步呼叫 C++
原文 http://www.sailsxu.com/?p=613
protobuf原生的非同步呼叫
void DoneCallback(PingMessage *response) {
}
void async_test() {
RpcClient client("127.0.0.1", 8000);
PingService::Stub stub(client.Channel());
if (!client.init()) {
printf("can't connect\n");
}
PingMessage request;
// get time
gettimeofday(&aync_starttime, NULL );
request.set_time(aync_starttime.tv_sec*1000+int(aync_starttime.tv_usec/1000));
PingMessage* response = new PingMessage();
Closure* callback = NewCallback(&DoneCallback, response);
stub.ping(client.Controller(), &request, response, callback);
}
原生的protobuf提供的rpc框架是讓我們提供一個回撥,當有對應的響應時(傳送的訊息序列號與接收的相同),呼叫對應的回撥即可;它生成的介面同步和非同步是相同的;
與同步呼叫有相同的問題;由於它只能一個request和response,沒有context和status,所以不能帶額外的引數;也不能表示呼叫成功與否;
GRPC非同步呼叫
class GreeterClient {
public:
explicit GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}
std::string SayHello(const std::string& user) {
HelloRequest request;
request.set_name(user);
HelloReply reply;
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
// The producer-consumer queue we use to communicate asynchronously with the
// gRPC runtime.
CompletionQueue cq;
// Storage for the status of the RPC upon completion.
Status status;
// 呼叫stub_->AsyncSayHello非同步介面向伺服器傳送請求
std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc(
stub_->AsyncSayHello(&context, request, &cq));
// 設定rpc完成時動作,結果引數,狀態,和tag(tag用來方便應用層能區分不同的請求,雖然grpc內部有每個請求自己的uid,但是對於應用層卻不能用,通過在這裡設定一個tag,可以方便的區別,因為非同步的可能會多個請求通過rpc發出,接收到後都放到了CompletionQueue中,所以應用層要能區分,就要設定一個tag)
rpc->Finish(&reply, &status, (void*)1);
void* got_tag;
bool ok = false;
// CompletionQueue的next會阻塞到有一個結果為止,got_tag會設定成前面的tag,而ok會設定成程式碼是否成功的拿到了響應(ok與status不同,status是表示伺服器的結果,而ok只是表示阻塞佇列中Next的結果)
GPR_ASSERT(cq.Next(&got_tag, &ok));
// 通過tag區分不同的響應對應的請求
GPR_ASSERT(got_tag == (void*)1);
GPR_ASSERT(ok);
// 最終返回值
if (status.ok()) {
return reply.message();
} else {
return "RPC failed";
}
}
private:
std::unique_ptr<Greeter::Stub> stub_;
};
有三點要注意的 :
1. 上面展示了非同步呼叫的使用方式,但是實際工作中不會這麼去用,因為它這樣寫還是一個同步的;
2. 這種非同步的方式是基於CompletionQueue做的,與protobuf自帶的rpc基於回撥的方式完全不同,如果是protobuf rpc,那麼非同步回撥會由rpc內部執行緒來做;而grpc在實際工作中,還是得自己有一個執行緒去呼叫CompletionQueue的next來等待訊息。
3. 這個protobuf rpc方式相差很大,但是思想卻是回撥由誰來呼叫引起的,由rpc執行緒來呼叫,那就要傳回調給內部,如果由使用者執行緒來呼叫,就要由一個佇列先儲存結果;
這裡有一個疑問:
為什麼要自己建立一個reply,而不是讓grpc內部自己建立,一方面增加了一個引數,另一方面也增加了管理的複雜性,比如有多次呼叫非同步介面,那麼為了有在另一個執行緒中使用,要把這個reply物件儲存起來,再根據tag去讀取不同的物件,這樣比較複雜,如果是grpc內部建立,那麼就麼可以把結果儲存到CompletionQueue中,通過next來得到結果。如果只是為了保持誰建立誰銷燬的原則,如果是內部建立,可以通過返回一個std::unique_ptr來儲存可以自動銷燬。
所以總的來說,非同步呼叫介面對於應用層來說還不是不太方便,可以像protobuf rpc一樣介面一個回撥函式(引數就是response, status和tag);這樣更方便一些;
GRPC伺服器非同步
這裡的非同步伺服器是指這個情況:當一個任務需要時間比較長時,不能一直佔用工作執行緒,這時需要啟動另一個執行緒去做,完成之後通知回server的工作執行緒處理;
在這種情況其實和客戶端的需要用非同步的情況一樣,都是不能阻塞執行緒,讓另一個執行緒處理。
如果是自己來實現這樣的邏輯,會怎麼處理呢?
收到非同步呼叫請求-》首先是儲存上下文(比如呼叫引數,client的writer),加入一個佇列中-》在新的執行緒從佇列中取出任務處理-》呼叫writer向client寫資料。
grpc的基本流程:
// 註冊一個服務,並啟動
helloworld::Greeter::AsyncService service;
ServerBuilder builder;
builder.AddListeningPort("0.0.0.0:50051", InsecureServerCredentials());
builder.RegisterAsyncService(&service);
auto cq = builder.AddCompletionQueue();
auto server = builder.BuildAndStart();
// 建立一個任務,去等待請求(非同步,所以這裡會馬上返回), 當收到一個sayhello的呼叫之後,context,request,responder進行初始化,然後加入佇列中(根據tag來設定)
ServerContext context;
HelloRequest request;
ServerAsyncResponseWriter<HelloReply> responder;
service.RequestSayHello(&context, &request, &responder, &cq, &cq, (void*)1);
// 等待佇列結果,如果tag是上面設定的值,說明這個結果就是得到了請求呼叫,然後就可以開始處理了
HelloReply reply;
Status status;
void* got_tag;
bool ok = false;
cq.Next(&got_tag, &ok);
if (ok && got_tag == (void*)1) {
// set reply and status
// 建立完成之後,呼叫responder.Finish傳送結果,如果傳送成功,也會以tag為標識加入佇列中
responder.Finish(reply, status, (void*)2);
}
// 處理完成之後,刪除自己
void* got_tag;
bool ok = false;
cq.Next(&got_tag, &ok);
if (ok && got_tag == (void*)2) {
// clean up
}
從上面的流程和一般的想法有點不一樣:
它是先呼叫RequestSayHello開啟處理流程(相當於註冊了一個處理器)-》然後非同步請求呼叫到達-》根據RequestSayHello引數加入一個佇列-》從佇列中取出資料處理-》呼叫responder.Finish傳送-》傳送狀態又會入佇列-》清理這次的處理流程。
和同步呼叫比起來,沒有去註冊service,而就直接註冊對應的方法;並且由於是非同步處理,所以伺服器不主動呼叫,所以它會把收到的請求等資訊放入佇列,我們自己要去遍歷;
上面的是基本的流程,下面是一個完整的例子,通過CallData物件,封裝了一個非同步請求的所以動作(這個物件封裝得很好,一種非同步請求一個型別,外界不用關注它的內部實現);
grpc的伺服器非同步呼叫完整程式碼:
class ServerImpl final {
public:
~ServerImpl() {
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
// 開始註冊service,並執行
void Run() {
std::string server_address("0.0.0.0:50051");
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service_);
cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// 開始處理執行緒
HandleRpcs();
}
private:
// 這個類封裝了非同步請求處理所需要的狀態與邏輯
// Class encompasing the state and logic needed to serve a request.
class CallData {
public:
// 建立一個calldata物件,然後開始呼叫呼叫對應的函式;這個物件封裝了請求,ctx(用於傳送訊息給軍客戶端)
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
// 開始註冊SayHello非同步處理流程,當收到一個sayhello的請求呼叫後,會加入佇列
status_ = PROCESS;
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
this);
} else if (status_ == PROCESS) {
// 這裡又新建立了處理流程,不然可能在處理過程中有請求來就不能及時處理
new CallData(service_, cq_);
// 從佇列中拿到請求,真正的處理邏輯,這個會在一個新執行緒中執行
// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name());
// 設定狀態
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
} else {
// 傳送完成之後入佇列中的資料
GPR_ASSERT(status_ == FINISH);
// Once in the FINISH state, deallocate ourselves (CallData).
delete this;
}
}
private:
// 非同步service
Greeter::AsyncService* service_;
// 一個生產者消費者佇列
ServerCompletionQueue* cq_;
ServerContext ctx_;
HelloRequest request_;
HelloReply reply_;
// 用於訊息回覆的方法
ServerAsyncResponseWriter<HelloReply> responder_;
// Let's implement a tiny state machine with the following states.
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_; // The current serving state.
};
// This can be run in multiple threads if needed.
void HandleRpcs() {
// 建立一個新的,由於新建立的status為create,所以它馬上會開始service的RequestSayHello方法
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
// 從
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
std::unique_ptr<ServerCompletionQueue> cq_;
Greeter::AsyncService service_;
std::unique_ptr<Server> server_;
};
int main(int argc, char** argv) {
ServerImpl server;
server.Run();
return 0;
}