Search code examples
c++rpcgrpc

How can I read and write from a grpc stream simultaneously


I am now implementing the Raft algorithm, and I want to use gRPC stream to do this. My main idea is to create 3 streams for each node to every other peers, one stream will transmit one type of RPCs, there are AppendEntries, RequestVote and InstallSnapshot. I write some code with limited help from route_guide, because in its bidirectional stream demo RouteChat, the client send all its data before it starts to read.

Firstly, I want to write to a stream at any time, so I write the following codes

void RaftMessagesStreamClientSync::AsyncRequestVote(const RequestVoteRequest& request){
    std::string peer_name = this->peer_name;
    debug("GRPC: Send RequestVoteRequest from %s to %s\n", request.name().c_str(), peer_name.c_str());
    request_vote_stream->Write(request);
}

Meanwhile, I want a thread keep reading from a stream, like the following codes, which is called immediately after RaftMessagesStreamClientSync is constructed.

void RaftMessagesStreamClientSync::handle_response(){
    // strongThis is a must 
    auto strongThis = shared_from_this();
    t1 = new std::thread([strongThis](){
        RequestVoteResponse response;
        while (strongThis->request_vote_stream->Read(&response)) {
            debug("GRPC: Recv RequestVoteResponse from %s, me %s\n", response.name().c_str(), strongThis->raft_node->name.c_str());
            ...
        }
    });
    ...

In order to initialize 3 streams, I have to write the constructor like this, I use 3 ClientContext here because the document says one ClientContext for one RPC

struct RaftMessagesStreamClientSync : std::enable_shared_from_this<RaftMessagesStreamClientSync>{
    typedef grpc::ClientReaderWriter<RequestVoteRequest, RequestVoteResponse> CR;
    typedef grpc::ClientReaderWriter<AppendEntriesRequest, AppendEntriesResponse> CA;
    typedef grpc::ClientReaderWriter<InstallSnapshotRequest, InstallSnapshotResponse> CI;

    std::unique_ptr<CR> request_vote_stream;
    std::unique_ptr<CA> append_entries_stream;
    std::unique_ptr<CI> install_snapshot_stream;
    ClientContext context_r;
    ClientContext context_a;
    ClientContext context_i;
    std::thread * t1 = nullptr;
    std::thread * t2 = nullptr;
    std::thread * t3 = nullptr;
    ...
}
RaftMessagesStreamClientSync::RaftMessagesStreamClientSync(const char * addr, struct RaftNode * _raft_node) : raft_node(_raft_node), peer_name(addr) {
    std::shared_ptr<Channel> channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
    stub = raft_messages::RaftStreamMessages::NewStub(channel);
    // 1
    request_vote_stream = stub->RequestVote(&context_r);
    // 2
    append_entries_stream = stub->AppendEntries(&context_a);
    // 3
    install_snapshot_stream = stub->InstallSnapshot(&context_i);    
}
~RaftMessagesStreamClientSync() {
    raft_node = nullptr;
    t1->join();
    t2->join();
    t3->join();
    delete t1;
    delete t2;
    delete t3;
}

Then I implement the server side

Status RaftMessagesStreamServiceImpl::RequestVote(ServerContext* context, ::grpc::ServerReaderWriter< ::raft_messages::RequestVoteResponse, RequestVoteRequest>* stream){
    RequestVoteResponse response;
    RequestVoteRequest request;
    while (stream->Read(&request)) {
        ...
    }

    return Status::OK;
}

Then 2 problems happen:

  1. When I test with 3 nodes, which actually creates 2 RaftMessagesStreamServiceImpl for each node, the statement from 1 to 3 cost a long time to execute.
  2. There is no RPC received from server side. There are similar problems when using Bidi Aysnc Server, However I can't figure out how this post can help me.

UPDATE

After some debugging, I found request_vote_stream->Write(request) returns 0, which, according to the document, means the stream is closed. However why is it closed?


Solution

  • After some debugging, I found that the two problem are all due to one problem that I create a client before I create a server.

    Because I originally uses unary RPC calls, so a previous call from client only causes a gRPC error code 14. The program continues because every call sent after the server is created can be handled correctly.

    However, when it comes to streaming calls, stub->RequestVote(&context_r) will end up calling a blocking function ClientReaderWriter::ClientReaderWriter, which will try to connect to the server, which is not created now.

    /// Block to create a stream and write the initial metadata and \a request
    /// out. Note that \a context will be used to fill in custom initial metadata
    /// used to send to the server when starting the call.
    ClientReaderWriter(::grpc::ChannelInterface* channel,
                     const ::grpc::internal::RpcMethod& method,
                     ClientContext* context)
      : context_(context),
        cq_(grpc_completion_queue_attributes{
            GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
            GRPC_CQ_DEFAULT_POLLING}),  // Pluckable cq
        call_(channel->CreateCall(method, context, &cq_)) {
    if (!context_->initial_metadata_corked_) {
      ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
          ops;
      ops.SendInitialMetadata(context->send_initial_metadata_,
                              context->initial_metadata_flags());
      call_.PerformOps(&ops);
      cq_.Pluck(&ops);
    }
    }
    

    As a consequence, the connection has not yet been established.