Search code examples
c++grpcgrpc-c++

Why does shutting down this grpc::CompletionQueue cause an assertion?


At this question, I asked how to unblock a grpc::CompletionQueue::Next() that is waiting on a grpc::Channel::NotifyOnStateChange(..., gpr_inf_future(GPR_CLOCK_MONOTONIC), ...).

That question, specifically, is still unanswered, but I am trying a workaround, where the CompletionQueue is instead waiting on a grpc::Channel::NotifyOnStateChange() with a non-infinite deadline:

// main.cpp
#include <chrono>
#include <iostream>
#include <memory>
#include <thread>
#include <grpcpp/grpcpp.h>
#include <unistd.h>

using namespace std;
using namespace grpc;

void threadFunc(shared_ptr<Channel> ch, CompletionQueue* cq) {
  void* tag = NULL;
  bool ok = false;
  int i = 1;
  grpc_connectivity_state state = ch->GetState(false);
  std::chrono::time_point<std::chrono::system_clock> now =
        std::chrono::system_clock::now();
  std::chrono::time_point<std::chrono::system_clock> deadline =
    now + std::chrono::seconds(2);

  cout << "state " << i++ << " = " << (int)state << endl;
  ch->NotifyOnStateChange(state,
                          //gpr_inf_future(GPR_CLOCK_MONOTONIC),
                          deadline,
                          cq,
                          (void*)1);

  while (cq->Next(&tag, &ok)) {
    state = ch->GetState(false);
    cout << "state " << i++ << " = " << (int)state << endl;
    now = std::chrono::system_clock::now();
    deadline = now + std::chrono::seconds(2);
    ch->NotifyOnStateChange(state,
                            //gpr_inf_future(GPR_CLOCK_MONOTONIC),
                            deadline,
                            cq,
                            (void*)1);
  }

  cout << "thread end" << endl;
}

int main(int argc, char* argv[]) {
  ChannelArguments channel_args;
  CompletionQueue cq;

  channel_args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
  channel_args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000);
  channel_args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 2000);
  channel_args.SetInt(GRPC_ARG_HTTP2_BDP_PROBE, 0);
  channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);
  channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 30000);
  channel_args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS,
                      60000);

  {
    shared_ptr<Channel> ch(CreateCustomChannel("my_grpc_server:50051",
                                               InsecureChannelCredentials(),
                                               channel_args));
    std::thread my_thread(&threadFunc, ch, &cq);
    cout << "sleeping" << endl;
    sleep(5);
    cout << "slept" << endl;
    cq.Shutdown();
    cout << "shut down cq" << endl;
    my_thread.join();
  }
}

Output of the running executable:

$ ./a.out
sleeping
state 1 = 0
state 2 = 0
state 3 = 0
slept
shut down cq
state 4 = 0
E1012 15:29:07.677225824      54 channel_connectivity.cc:234] assertion failed: grpc_cq_begin_op(cq, tag)
Aborted (core dumped)

This version periodically unblocks, as expected, but why does it assert?

My question is ultimately: how do you cleanly exit from a loop/thread that is waiting on a grpc::CompletionQueue that is waiting on a grpc::Channel::NotifyOnStateChange() ?

My experience has been that with an infinite deadline, it's impossible to unblock grpc::CompletionQueue::Next(), and with a non-infinite deadline, shutting down the grpc::CompletionQueue results in an assert, which is presumably a non-clean exit.


Solution

  • The documentation for CompletionQueue::Shutdown()`](https://grpc.github.io/grpc/cpp/classgrpc_1_1_completion_queue.html#a40efddadd9073386fbcb4f46e8325670) says:

    Also note that applications must ensure that no work is enqueued on this completion queue after this method is called.

    In other words, once you shut down the CQ, it is illegal to call NotifyOnStateChange() again, because that is enqueing new work.

    In this case, what you should expect to see after you call CompletionQueue::Shutdown() is that the already-invoked call to CompletionQueue::Next() will return the already-requested NotifyOnStateChange() completion, and the next call to CompletionQueue::Next() will return false, thus indicating that the CQ is shut down. However, your code is not making a call to Next() to see if the CQ is shut down before it calls NotifyOnStateChange() again to request another state change notification, so that is happening after the CQ is shut down, which is why you're seeing this assertion.

    In general, the right way to use a CQ is to have a separate, dedicated set of threads that always call Next() in a loop but do not themselves start any new work on the CQs. Starting new work on the CQs should be done in separate thread(s) and should not be done after the CQ is shut down.

    I hope this information is helpful.