Search code examples
c++redishorizontal-scalinguwebsockets

Using Redis with uWebSockets C++ server


I currently have a C++ web sockets server using uWebSockets. I want to scale it horizontally using Redis. It means that I'll use this Redis client. However, I'm facing a problem with the implementation of pub/sub channels. Indeed, since the Redis channel subscription needs its own event loop (according to this example), and obviously the same for the uWebSockets app (see this example), I end up with two event loops. And my problem is that I don't know how to manage running these two loops properly.

I tried running them on two different threads, which works if they are totally independent of each other. However, since I want to broadcast the upcoming Redis message to all web sockets client, I need the uWebSockets app instance (see this example) in the Redis thread to broadcast it:

Subscriber sub = redis->subscriber();

sub.on_message([](std::string channel, std::string msg){
    app->publish("broadcast", msg, (uWS::OpCode)1);
});

Therefore the two event loops are not independant of each other and when I received a message from Redis, it takes about 5 seconds before it is handled by the uWebSockets app.

Does someone know how to properly set up this Redis pus/sub feature ? Thank you for your help.


Solution

  • I managed to solve my problem.

    I found that calling app->publish(...) in my second thread was not thread-safe. Indeed, an interesting post showed me that in order to access the app from another thread, we have to use the method defer on the event loop. Therefore, the structure becomes:

    ...
    
    uWS::SSLApp *app = nullptr;
    uWS::Loop *loop = nullptr;
    
    Redis *redis = nullptr;
    
    ...
    
    void redisEventLoopThread(Subscriber *sub) {
    
        sub->on_message([](string channel, string msg) {
            loop->defer([msg]() {
                app->publish(channel, msg, ...);
            });
        });
    
        sub->subscribe("channel_name");
    
        while (true) {
            try {
                sub->consume();
            } catch (const Error &err) {...}
        }
    }
    
    ...
    
    int main() {
        app = new uWS::SSLApp();
        loop = uWS::Loop::get();
    
        redis = new Redis(...);
        Subscriber sub = redis->subscriber();
    
        thread redisThread(redisEventLoopThread, &sub);
    
        app->ws<...>(...).listen(...).run();
    
        ...
    }