Search code examples
c++grpcetcd

gRPC and etcd client


This question involves etcd specific stuff, but I think the question more related to work with gRPC in general. I'm trying to create etcd Watch for some keys, since the documentation is sparse I had a look at Nokia implementation It was easy to adapt code to my needs and I came up with first version which worked just fine, creating WatchCreateRequest, and firing callback on key update. So far so good. Then I've tried to add more than one key to watch. Fiasco! ClientAsyncReaderWriter is failing to Read/Write in such a case. Now to the question.

If I have following members in my class

Watch::Stub watchStub;
CompletionQueue completionQueue;
ClientContext context;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest, WatchResponse>> stream;
WatchResponse reply;

and I want to support multiple Watches added to my class, I guess I have to hold several variables per watch and not as class members. First of all, I guess, WatchResponse reply should be one per Watch. I'm less sure about the stream, should I hold one per Watch? I'm almost sure that context could be reused for all Watches and 100% sure the stub and completionQueue can be reused for all Watches. So the question is my guess-work right? Whats about thread safety? Didnt find any documentation describing what objects are safe to use from multiple thread and where I have to synchronize access. Any link to documentation (not this one) will be appreciated!

Test code before I split members into single Watch property (no proper shutdown, I know)

using namespace grpc;
class Watcher
{
public:
    using Callback = std::function<void(const std::string&, const std::string&)>;

    Watcher(std::shared_ptr<Channel> channel) : watchStub(channel)
    {
        stream = watchStub.AsyncWatch(&context, &completionQueue, (void*) "create");
        eventPoller = std::thread([this]() { WaitForEvent(); });
    }

    void AddWatch(const std::string& key, Callback callback)
    {
        AddWatch(key, callback, false);
    }

    void AddWatches(const std::string& key, Callback callback)
    {
        AddWatch(key, callback, true);
    }

private:
    void AddWatch(const std::string& key, Callback callback, bool isRecursive)
    {
        auto insertionResult = callbacks.emplace(key, callback);
        if (!insertionResult.second) {
            throw std::runtime_error("Event handle already exist.");
        }
        WatchRequest watch_req;
        WatchCreateRequest watch_create_req;
        watch_create_req.set_key(key);
        if (isRecursive) {
            watch_create_req.set_range_end(key + "\xFF");
        }

        watch_req.mutable_create_request()->CopyFrom(watch_create_req);
        stream->Write(watch_req, (void*) insertionResult.first->first.c_str());

        stream->Read(&reply, (void*) insertionResult.first->first.c_str());
    }

    void WaitForEvent()
    {
        void* got_tag;
        bool ok = false;

        while (completionQueue.Next(&got_tag, &ok)) {
            if (ok == false) {
                break;
            }
            if (got_tag == (void*) "writes done") {
                // Signal shutdown
            }
            else if (got_tag == (void*) "create") {
            }
            else if (got_tag == (void*) "write") {
            }
            else {

                auto tag = std::string(reinterpret_cast<char*>(got_tag));
                auto findIt = callbacks.find(tag);
                if (findIt == callbacks.end()) {
                    throw std::runtime_error("Key \"" + tag + "\"not found");
                }

                if (reply.events_size()) {
                    ParseResponse(findIt->second);
                }
                stream->Read(&reply, got_tag);
            }
        }
    }

    void ParseResponse(Callback& callback)
    {
        for (int i = 0; i < reply.events_size(); ++i) {
            auto event = reply.events(i);
            auto key = event.kv().key();
            callback(event.kv().key(), event.kv().value());
        }
    }

    Watch::Stub watchStub;
    CompletionQueue completionQueue;
    ClientContext context;
    std::unique_ptr<ClientAsyncReaderWriter<WatchRequest, WatchResponse>> stream;
    WatchResponse reply;
    std::unordered_map<std::string, Callback> callbacks;
    std::thread eventPoller;
};

Solution

  • I'm sorry that I'm not very sure about the proper Watch design here. It's not very clear to me whether you want to create a gRPC call for each Watch.

    Anyway, each gRPC call will have its own ClientContext, ClientAsyncReaderWriter. But stub and CompletionQueue is not per-call thing.

    As far as I know, there is no central place to find the thread-safe classes. You may want to read the API document to have a correct expectation.

    When I was writing the async server load reporting service, the only place I add synchronization myself is around CompletionQueue, so that I don't en-queue new tags to the cq if it's shut down.