This question involves etcd
specific stuff, but I think the question more related to work with gRPC
in general.
I'm trying to create etcd
Watch
for some keys, since the documentation is sparse I had a look at Nokia implementation
It was easy to adapt code to my needs and I came up with first version which worked just fine, creating WatchCreateRequest
, and firing callback on key update. So far so good. Then I've tried to add more than one key to watch. Fiasco! ClientAsyncReaderWriter
is failing to Read/Write in such a case. Now to the question.
If I have following members in my class
Watch::Stub watchStub;
CompletionQueue completionQueue;
ClientContext context;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest, WatchResponse>> stream;
WatchResponse reply;
and I want to support multiple Watches
added to my class, I guess I have to hold several variables per watch and not as class members.
First of all, I guess, WatchResponse reply
should be one per Watch
. I'm less sure about the stream
, should I hold one per Watch
? I'm almost sure that context
could be reused for all Watches
and 100% sure the stub
and completionQueue
can be reused for all Watches
.
So the question is my guess-work right? Whats about thread safety? Didnt find any documentation describing what objects are safe to use from multiple thread and where I have to synchronize access.
Any link to documentation (not this one) will be appreciated!
Test code before I split members into single Watch
property
(no proper shutdown, I know)
using namespace grpc;
class Watcher
{
public:
using Callback = std::function<void(const std::string&, const std::string&)>;
Watcher(std::shared_ptr<Channel> channel) : watchStub(channel)
{
stream = watchStub.AsyncWatch(&context, &completionQueue, (void*) "create");
eventPoller = std::thread([this]() { WaitForEvent(); });
}
void AddWatch(const std::string& key, Callback callback)
{
AddWatch(key, callback, false);
}
void AddWatches(const std::string& key, Callback callback)
{
AddWatch(key, callback, true);
}
private:
void AddWatch(const std::string& key, Callback callback, bool isRecursive)
{
auto insertionResult = callbacks.emplace(key, callback);
if (!insertionResult.second) {
throw std::runtime_error("Event handle already exist.");
}
WatchRequest watch_req;
WatchCreateRequest watch_create_req;
watch_create_req.set_key(key);
if (isRecursive) {
watch_create_req.set_range_end(key + "\xFF");
}
watch_req.mutable_create_request()->CopyFrom(watch_create_req);
stream->Write(watch_req, (void*) insertionResult.first->first.c_str());
stream->Read(&reply, (void*) insertionResult.first->first.c_str());
}
void WaitForEvent()
{
void* got_tag;
bool ok = false;
while (completionQueue.Next(&got_tag, &ok)) {
if (ok == false) {
break;
}
if (got_tag == (void*) "writes done") {
// Signal shutdown
}
else if (got_tag == (void*) "create") {
}
else if (got_tag == (void*) "write") {
}
else {
auto tag = std::string(reinterpret_cast<char*>(got_tag));
auto findIt = callbacks.find(tag);
if (findIt == callbacks.end()) {
throw std::runtime_error("Key \"" + tag + "\"not found");
}
if (reply.events_size()) {
ParseResponse(findIt->second);
}
stream->Read(&reply, got_tag);
}
}
}
void ParseResponse(Callback& callback)
{
for (int i = 0; i < reply.events_size(); ++i) {
auto event = reply.events(i);
auto key = event.kv().key();
callback(event.kv().key(), event.kv().value());
}
}
Watch::Stub watchStub;
CompletionQueue completionQueue;
ClientContext context;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest, WatchResponse>> stream;
WatchResponse reply;
std::unordered_map<std::string, Callback> callbacks;
std::thread eventPoller;
};
I'm sorry that I'm not very sure about the proper Watch
design here. It's not very clear to me whether you want to create a gRPC call for each Watch
.
Anyway, each gRPC call will have its own ClientContext
, ClientAsyncReaderWriter
. But stub
and CompletionQueue
is not per-call thing.
As far as I know, there is no central place to find the thread-safe classes. You may want to read the API document to have a correct expectation.
When I was writing the async server load reporting service, the only place I add synchronization myself is around CompletionQueue
, so that I don't en-queue new tags to the cq if it's shut down.