Search code examples
c++grpc

grpc::ClientBidiReactor crashes when create ClientBidiReactor Continuously


I have a function as follows:

  ErrorCode SendData(const char *data, size_t length)
  {
    auto stream = std::make_unique<Stream>(stub_.get(), client_id_, timeout_seconds_);
    return stream->SendMessage(data, length);
  }

and Stream class:

class Stream : public grpc::ClientBidiReactor<SendBinaryDataRequest, SendBinaryDataResponse> {
   public:
    Stream(MessageTransfer::Stub *stub)
        : context_(), status_(grpc::Status::OK), done_(false), request_(), response_(), mu_(), cv_(), 
    {
      stub->async()->SendStreamBinaryDataWithAck(&context_, this);
      StartCall();
      std::cout << "new construtor:" << this << std::endl;
    }

    void OnWriteDone(bool ok) override
    {
      std::unique_lock<std::mutex> l(mu_);
      if (ok) {
        StartRead(&response_);
      }
    }

    void OnWritesDoneDone(bool) override
    {
      std::unique_lock<std::mutex> l(mu_);
      cv_.notify_one();
    }
    
    void OnReadDone(bool ok) override
    {
      std::unique_lock<std::mutex> l(mu_);
      if (ok) {
        cv_.notify_one();
      }
    }

    void OnDone(const grpc::Status &s) override
    {
      std::unique_lock<std::mutex> l(mu_);
      LOG("%p stream closed status:%d error:%s\n", this, s.error_code(), s.error_message().c_str());
      status_ = s;
      done_   = true;
      cv_.notify_one();
    }

    ErrorCode SendMessage(const char *data, size_t length)
    {
      std::unique_lock<std::mutex> l(mu_);

      if (done_) {
        return convertToErrorCode(status_);
      }

      request_.set_data(data, length);
      LOG("StartWrite %p\n", this);
      StartWrite(&request_);
      if(std::cv_status::timeout == cv_.wait_for(l, std::chrono::seconds(timeout_sec_))) {
        return ErrorCode_TIMEOUT;
      }
      if (done_) {
      return convertToErrorCode(status_);
      }
      return static_cast<ErrorCode>(response_.error_code());
    }
  }

when I use SendData to non-existent server, it will print "failed to connect to all addresses", this is right; However when I call SendData several times, it crashes:

pure virtual method called terminate called without an active exception Aborted (core dumped)

log:

new construtor:0x168dbc0
StartWrite 0x168dbc0
0x168dbc0 stream closed status:14 error:failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/hs_message.sock: Connection refused
new construtor:0x168dbc0
StartWrite 0x168dbc0
Segmentation fault (core dumped)

It seems the ptr is same every time I make a stream. But I dont think it should be cause of the crash

gdb info another gdb info Can anyone help me? thanks


Solution

  • At last I find the program runs successfully after I added AddHold/RemoveHold Codes like this:

    class Stream : public grpc::ClientBidiReactor<SendBinaryDataRequest,SendBinaryDataResponse> {
    public:
    Stream(MessageTransfer::Stub *stub)
        : context_(), status_(grpc::Status::OK), done_(false), request_(), response_(), mu_(), cv_(), 
    {
      stub->async()->SendStreamBinaryDataWithAck(&context_, this);
      AddHold()
      StartCall();
      std::cout << "new construtor:" << this << std::endl;
    }
    
    void OnWriteDone(bool ok) override
    {
      std::unique_lock<std::mutex> l(mu_);
      if (ok) {
        StartRead(&response_);
      } else {
        RemoveHold();
      }
    }
    
    void OnWritesDoneDone(bool) override
    {
      std::unique_lock<std::mutex> l(mu_);
      cv_.notify_one();
    }
    
    void OnReadDone(bool ok) override
    {
      std::unique_lock<std::mutex> l(mu_);
      if (ok) {
        cv_.notify_one();
      } else {
        RemoveHold();
      }
    }
    
    void OnDone(const grpc::Status &s) override
    {
      std::unique_lock<std::mutex> l(mu_);
      LOG("%p stream closed status:%d error:%s\n", this, s.error_code(), s.error_message().c_str());
      status_ = s;
      done_   = true;
      cv_.notify_one();
    }
    
    ErrorCode SendMessage(const char *data, size_t length)
    {
      std::unique_lock<std::mutex> l(mu_);
    
      if (done_) {
        return convertToErrorCode(status_);
      }
    
      request_.set_data(data, length);
      LOG("StartWrite %p\n", this);
      StartWrite(&request_);
      if(std::cv_status::timeout == cv_.wait_for(l, std::chrono::seconds(timeout_sec_))) {
        return ErrorCode_TIMEOUT;
      }
      if (done_) {
      return convertToErrorCode(status_);
      }
      return static_cast<ErrorCode>(response_.error_code());
    }
    

    }