gRPC 實現多執行緒非同步服務端
阿新 • • 發佈:2019-01-05
// AsyncServer_Demo.cpp : 定義控制檯應用程式的入口點。 // #include "stdafx.h" #include <memory> #include <iostream> #include <string> #include <thread> #include <process.h> #include <grpc++/grpc++.h> #include <grpc/support/log.h> #include "asyncservice.grpc.pb.h" using grpc::Server; using grpc::ServerAsyncResponseWriter; using grpc::ServerBuilder; using grpc::ServerContext; using grpc::ServerCompletionQueue; using grpc::Status; using cecily::Student; using cecily::Teacher; using cecily::School; class ServerImpl final{ public: ~ServerImpl() { server_->Shutdown(); //停止服務 cq_->Shutdown(); } void Run() { std::string server_address("0.0.0.0:66666"); ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(&service_); cq_ = builder.AddCompletionQueue(); server_ = builder.BuildAndStart(); std::cout << "Server listening on " << server_address << std::endl; //註冊服務 通過CallData物件的狀態,在初始狀態下,將對應型別的CallData物件傳遞進入ServerCompletionQueue 的 cq_物件中 //當有服務請求過來的時候 呼叫next 能得到對應的請求, 請求型別通過CallData物件的s_type_值區分,用來區分不同的服務,做不同的處理 new CallData(&service_, cq_.get(), ServerImpl::CallData::SS_HelloStudent); new CallData(&service_, cq_.get(), ServerImpl::CallData::SS_HelloTeacher); //註冊服務 應該有多少個服務有要註冊多少個服務, 例子只有兩個服務 //開啟多執行緒處理rpc呼叫請求 for ( int i = 0 ; i < 8; i++ ) { _beginthreadex(NULL, 0, ServerImpl::ThreadHandlerRPC, (void*)this, 0, 0); } } private: class CallData { public: enum ServiceType { SS_HelloStudent = 0, SS_HelloTeacher }; public: CallData(School::AsyncService* service, ServerCompletionQueue* cq, ServiceType s_type) :service_(service),cq_(cq),s_type_(s_type), student_response(&ctx_), teacher_response(&ctx_), status_(CREATE){ Process(); } void Process() { if ( status_ == CREATE ) { status_ = PROCESS; switch (s_type_) { //根據不同的服務 註冊不同的 服務型別到 ServerCompletionQueue 佇列, 看名字和使用有點像完成埠 (沒有去驗證研究) case ServerImpl::CallData::SS_HelloStudent: service_->RequestHelloStudent(&ctx_, &student_request, &student_response, cq_, cq_, this); break; case ServerImpl::CallData::SS_HelloTeacher: service_->RequestHelloTeacher(&ctx_, &teacher_request, &teacher_response, cq_, cq_, this); break; default: break; } } else if (status_ == PROCESS) { status_ = FINISH; new CallData(service_, cq_, this->s_type_); switch (s_type_) { case ServerImpl::CallData::SS_HelloStudent: { std::string name = student_request.name(); int age = student_request.age(); std::string prefix("Cecily HelloStudent: "); int nThreadID = GetCurrentThreadId(); char szTmp[20] = { 0 }; sprintf_s(szTmp,20, "執行緒:%d", nThreadID); std::string end(szTmp); teacher_request.set_name(prefix + name + end); status_ = FINISH; student_response.Finish(teacher_request, Status::OK, this); } break; case ServerImpl::CallData::SS_HelloTeacher: { std::string schoolname = teacher_request.school(); std::string prefix("Cecily HelloTeacher: "); int nThreadID = GetCurrentThreadId(); char szTmp[20] = { 0 }; sprintf_s(szTmp, 20, "執行緒:%d", nThreadID); std::string end(szTmp); student_request.set_name(prefix + schoolname +end); status_ = FINISH; teacher_response.Finish(student_request, Status::OK, this); } break; default: break; } } else { GPR_ASSERT(status_ == FINISH); delete this; } } private: School::AsyncService* service_; ServerCompletionQueue* cq_; ServerContext ctx_; ServiceType s_type_; Student student_request; ::grpc::ServerAsyncResponseWriter< ::cecily::Teacher> student_response; Teacher teacher_request; ::grpc::ServerAsyncResponseWriter< ::cecily::Student> teacher_response; enum CallStatus { CREATE, PROCESS, FINISH }; CallStatus status_; }; private: static unsigned __stdcall ThreadHandlerRPC(void* lparam) { ServerImpl* impl = (ServerImpl*)lparam; impl->HandleRPCS(); return 1; } void HandleRPCS() { void* tag; bool ok; while (true) { GPR_ASSERT(cq_->Next(&tag, &ok));//從ServerCompletionQueue 佇列拿到獲取請求任務,根據請求任務處理 邏輯 GPR_ASSERT(ok); static_cast<CallData*>(tag)->Process(); } } private: std::shared_ptr<ServerCompletionQueue> cq_; School::AsyncService service_; std::shared_ptr<Server> server_; }; int main() { ServerImpl server; server.Run(); char c; std::cin >> c; system("pause"); return 0; }
以上內容屬於學習記錄隨筆,因為從接觸到寫這個部落格只有兩天時間,中間難免會有不正確的地方,後續發現再更改。