1. 程式人生 > 其它 >grpc c++流式傳輸demo

grpc c++流式傳輸demo

目錄結構

 

 編譯指令碼build.sh

if [ -d "./proto_code" ];then
    rm -rf ./proto_code
fi
mkdir ./proto_code
protoc -I ./ --grpc_out=./proto_code --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` ./reduce.proto
protoc -I ./ --cpp_out=./proto_code ./reduce.proto

if [ -d "./build" ];then
    rm -rf ./build
fi
mkdir ./build
cd build
cmake ..
make
cd ..

CMakeLists.txt

cmake_minimum_required(VERSION 3.5)
project(test2)
find_package(Threads REQUIRED)
find_package(Protobuf REQUIRED)
set(_PROTOBUF_LIBPROTOBUF protobuf::libprotobuf)
set(_REFLECTION gRPC::grpc++_reflection)
find_package(gRPC CONFIG REQUIRED)
set(_GRPC_GRPCPP gRPC::grpc++)

SET(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} 
"-pthread") include_directories("${CMAKE_CURRENT_BINARY_DIR}/../proto_code") set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/../proto_code/reduce.pb.cc") set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/../proto_code/reduce.pb.h") set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/../proto_code/reduce.grpc.pb.cc") set
(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/../proto_code/reduce.grpc.pb.h") # hw_grpc_proto add_library(hw_grpc_proto ${hw_grpc_srcs} ${hw_grpc_hdrs} ${hw_proto_srcs} ${hw_proto_hdrs}) target_link_libraries(hw_grpc_proto ${_REFLECTION} ${_GRPC_GRPCPP} ${_PROTOBUF_LIBPROTOBUF}) # Targets greeter_[async_](client|server) foreach(_target reduce_server reduce_client) add_executable(${_target} "${_target}.cc") target_link_libraries(${_target} hw_grpc_proto ${_REFLECTION} ${_GRPC_GRPCPP} ${_PROTOBUF_LIBPROTOBUF}) endforeach()

proto檔案

syntax="proto3";

option java_multiple_files=true;
option java_package="io.grpc.example.reduce";
option java_outer_classname="ReduceProto";
option objc_class_prefix="RDC";

package reduce;

service ReduceService{
    rpc getData(stream Data) returns(stream Data){}
}

message Data{
    int32 data=1;
}

服務端程式碼

#include <iostream>
#include <string>
#include <memory>
#include <pthread.h>
#include <unistd.h>
#include <vector>

#include <grpc/grpc.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include "./proto_code/reduce.grpc.pb.h"

using std::cout;
using std::endl;
using std::string;
using std::unique_ptr;
using std::shared_ptr;
using std::vector;

using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerReader;
using grpc::ServerWriter;
using grpc::ServerReaderWriter;
using grpc::Status;

using reduce::Data;
using reduce::ReduceService;

class ReduceImpl final: public ReduceService::Service{
public:
    Status getData(ServerContext* context,ServerReaderWriter<Data,Data>* stream)override
    {
        Data read_data;
        while(stream->Read(&read_data))
        {
            pthread_mutex_lock(&mu_);
            received_data.push_back(read_data);
            cout<<"receive data: "<<read_data.data()<<endl;
            pthread_mutex_unlock(&mu_);
        }
        for(const Data& n:received_data)
        {
            stream->Write(n);
            cout<<"=====write "<<n.data()<<endl;
        }
        return Status::OK;
    }
private:
    pthread_mutex_t mu_=PTHREAD_MUTEX_INITIALIZER;
    vector<Data> received_data;
};

void RunServer()
{
    string server_addr("0.0.0.0:50051");
    ReduceImpl service;
    ServerBuilder builder;
    builder.AddListeningPort(server_addr,grpc::InsecureServerCredentials());
    builder.RegisterService(&service);
    unique_ptr<Server> server(builder.BuildAndStart());
    cout<<"Server listening on "<<server_addr<<endl;
    server->Wait();
}

int main(int argc,char** argv)
{
    RunServer();
    return 0;
}

客戶端程式碼

#include <iostream>
#include <memory>
#include <string>
#include <pthread.h>
#include <unistd.h>
#include <thread>
#include <vector>

#include <grpc/grpc.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>
#include "./proto_code/reduce.grpc.pb.h"

using std::cout;
using std::endl;
using std::string;

using grpc::Channel;
using grpc::ClientContext;
using grpc::ClientReader;
using grpc::ClientWriter;
using grpc::ClientReaderWriter;
using grpc::Status;

using reduce::Data;
using reduce::ReduceService;

class ReduceClient{
public:
    ReduceClient(std::shared_ptr<Channel> channel):stub_(ReduceService::NewStub(channel)){}
    void sendData()
    {
        ClientContext context;
        std::shared_ptr<ClientReaderWriter<Data,Data>> stream(stub_->getData(&context));

        auto thread_func=[stream](){
            std::vector<Data> v_tmp;
            for(int i=0;i<50;++i)
            {
                Data tmp;
                tmp.set_data(i);
                v_tmp.push_back(tmp);
            }
            for(Data& d:v_tmp)
            {
                std::cout<<"write"<<std::endl;
                stream->Write(d);
            }
            stream->WritesDone();
        };
        //pthread_create(&client_send_thread,NULL,(void* (*)(void*))&thread_func,NULL);
        std::thread writer(thread_func);
        
        Data recv_data;
        while(stream->Read(&recv_data))
        {
            std::cout<<"data "<<recv_data.data()<<" send success"<<std::endl;
        }
        writer.join();

        Status status=stream->Finish();
        if(!status.ok())
        {
            std::cout<<"getData rpc failed"<<std::endl;
        }
    }
private:
    std::unique_ptr<ReduceService::Stub> stub_;
    pthread_t client_send_thread;
};

int main(int argc,char** argv)
{
    ReduceClient client1(grpc::CreateChannel("localhost:50051",grpc::InsecureChannelCredentials()));
    client1.sendData();
    return 0;
}

流式傳輸使用stream,stream中有Read和Write,客戶端和服務端都要呼叫Read和Write

目前的問題是沒辦法寫成死迴圈,就是一直髮送資料。一旦寫成死迴圈就會報這個錯

E0323 16:50:21.462651883 1254646 call_op_set.h:985] assertion failed: false
Aborted (core dumped)

然後core就dump了