I am using gRPC
async client
in similar way to the example.
In this example (published in the gRPC
official github
) the client allocate memory for the message to send, using the address as tag
for the completion queue
, and when the message is being answered in listener thread the memory (known by the tag
- address) is free.
I'm afraid of situation where the server is not responding to a message and the memory is never being free.
gRPC
protect me from this situation?Async client send function
void SayHello(const std::string& user) {
// Data we are sending to the server.
HelloRequest request;
request.set_name(user);
// Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall;
// Because we are using the asynchronous API, we need to hold on to
// the "call" instance in order to get updates on the ongoing RPC.
call->response_reader =
stub_->PrepareAsyncSayHello(&call->context, request, &cq_);
// StartCall initiates the RPC call
call->response_reader->StartCall();
call->response_reader->Finish(&call->reply, &call->status, (void*)call);
}
Async client receive function for thread
void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
while (cq_.Next(&got_tag, &ok)) {
// The tag in this example is the memory location of the call object
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
// Verify that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
GPR_ASSERT(ok);
if (call->status.ok())
std::cout << "Greeter received: " << call->reply.message() << std::endl;
else
std::cout << "RPC failed" << std::endl;
// Once we're complete, deallocate the call object.
delete call;
}
}
Main
int main(int argc, char** argv) {
GreeterClient greeter(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
// Spawn reader thread that loops indefinitely
std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);
for (int i = 0; i < 100; i++) {
std::string user("world " + std::to_string(i));
greeter.SayHello(user); // The actual RPC call!
}
std::cout << "Press control-c to quit" << std::endl << std::endl;
thread_.join(); //blocks forever
return 0;
}
Does the gRPC protect me from this situation?
Kinda. gRPC guarantees that all queued operations will end up in their matching completion queue sooner or later. So your code is ok as long as:
In other words: It's ok, but fragile.
If you want to be truly robust, the way to go is std::shared_ptr<>
. However, they can mess with multithreaded performance in unexpected ways. So wether it's worth it or not depends on where your app lands on the performance vs robustness spectrum.
Such a refactor would look like:
AsyncClientCall
inherit from std::enable_shared_from_this
call
to std::make_shared<AsyncClientCall>()
while (cq_.Next(&got_tag, &ok)) {
auto call = static_cast<AsyncClientCall*>(got_tag)->shared_from_this();
And get rid of the delete
, obviously.
You can also get a decent halfway measure with unique_ptr<>
:
auto call = std::make_unique<AsyncClientCall>();
...
call->response_reader->Finish(&call->reply, &call->status, (void*)call.release());
and
std::unique_ptr<AsyncClientCall> call{static_cast<AsyncClientCall*>(got_tag)};
This guards against refactors and exceptions while maintaining everything else. However, this is only usable for unary rpcs that produce a single completion event. Streaming rpcs, or rpcs that exchange metadata will need completely different handling.