Search code examples
c++memory-leaksgrpcgrpc-c++

Memory leak in gRPC async_client


I am using gRPC async client in similar way to the example.

In this example (published in the gRPC official github) the client allocate memory for the message to send, using the address as tag for the completion queue, and when the message is being answered in listener thread the memory (known by the tag- address) is free.

I'm afraid of situation where the server is not responding to a message and the memory is never being free.

  • Does the gRPC protect me from this situation?
  • Should I implement it in a different way? (using smart pointers/save the pointers in data structure/etc...)

Async client send function

void SayHello(const std::string& user) {
    // Data we are sending to the server.
    HelloRequest request;
    request.set_name(user);

    // Call object to store rpc data
    AsyncClientCall* call = new AsyncClientCall;

    // Because we are using the asynchronous API, we need to hold on to
    // the "call" instance in order to get updates on the ongoing RPC.
    call->response_reader =
        stub_->PrepareAsyncSayHello(&call->context, request, &cq_);

    // StartCall initiates the RPC call
    call->response_reader->StartCall();

    call->response_reader->Finish(&call->reply, &call->status, (void*)call);

}

Async client receive function for thread

void AsyncCompleteRpc() {
    void* got_tag;
    bool ok = false;

    // Block until the next result is available in the completion queue "cq".
    while (cq_.Next(&got_tag, &ok)) {
        // The tag in this example is the memory location of the call object
        AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);

        // Verify that the request was completed successfully. Note that "ok"
        // corresponds solely to the request for updates introduced by Finish().
        GPR_ASSERT(ok);

        if (call->status.ok())
            std::cout << "Greeter received: " << call->reply.message() << std::endl;
        else
            std::cout << "RPC failed" << std::endl;

        // Once we're complete, deallocate the call object.
        delete call;
    }
}

Main

int main(int argc, char** argv) {


    GreeterClient greeter(grpc::CreateChannel(
            "localhost:50051", grpc::InsecureChannelCredentials()));

    // Spawn reader thread that loops indefinitely
    std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter);

    for (int i = 0; i < 100; i++) {
        std::string user("world " + std::to_string(i));
        greeter.SayHello(user);  // The actual RPC call!
    }

    std::cout << "Press control-c to quit" << std::endl << std::endl;
    thread_.join();  //blocks forever

    return 0;
}

Solution

  • Does the gRPC protect me from this situation?

    Kinda. gRPC guarantees that all queued operations will end up in their matching completion queue sooner or later. So your code is ok as long as:

    • No exception is thrown at an unfortunate time.
    • You don't make a change to the code that creates a code path that doesn't include queuing the operation or deleting the call.

    In other words: It's ok, but fragile.

    Option A:

    If you want to be truly robust, the way to go is std::shared_ptr<>. However, they can mess with multithreaded performance in unexpected ways. So wether it's worth it or not depends on where your app lands on the performance vs robustness spectrum.

    Such a refactor would look like:

    1. Have AsyncClientCall inherit from std::enable_shared_from_this
    2. Change the construction of call to std::make_shared<AsyncClientCall>()
    3. In the completion queue handler, increase the ref-count:
    while (cq_.Next(&got_tag, &ok)) {
        auto call = static_cast<AsyncClientCall*>(got_tag)->shared_from_this();
    

    And get rid of the delete, obviously.

    Option B:

    You can also get a decent halfway measure with unique_ptr<>:

        auto call = std::make_unique<AsyncClientCall>();
        ...
        call->response_reader->Finish(&call->reply, &call->status, (void*)call.release());
    

    and

        std::unique_ptr<AsyncClientCall> call{static_cast<AsyncClientCall*>(got_tag)};
    

    This guards against refactors and exceptions while maintaining everything else. However, this is only usable for unary rpcs that produce a single completion event. Streaming rpcs, or rpcs that exchange metadata will need completely different handling.