Search code examples
c++websocketc++17uwebsockets

uWebSockets C++ error on send(): cork buffer must not be acquired without checking cancork


I'm developing a websoscket server for an embedded system. The requirements are very simple: send updates to a list of signals ids the webapp subscribes to. Everything works fine until the crash happens. The problem seems to be part of the corking mechanism and I've tried to reproduce it under different conditions but the problem still occurs. In my implementation I have a custom pub/sub because I want to deliver more signal updates at once in one message: this is realized in a separate thread that sends asynchronous messages to the connected clients with a frequency of 10 times per second (with the available updates). The problem seems to happen when there is a collision with the send and message callback.

This is my app setup, very similar to default:

    auto app
        = uWS::App()
              .ws<ClientData>(
                  "/*", { /* Settings */
                          .compression      = uWS::CompressOptions(uWS::DEDICATED_COMPRESSOR_4KB | uWS::DEDICATED_DECOMPRESSOR),
                          .maxPayloadLength = 100 * 1024 * 1024,
                          .idleTimeout      = 16,
                          .maxBackpressure  = 100 * 1024 * 1024,
                          .closeOnBackpressureLimit = false,
                          .resetIdleTimeoutOnSend   = false,
                          .sendPingsAutomatically   = true,
                          /* Handlers */
                          // handshaking callback
                          .upgrade =
                              [this](auto *res, auto *req, auto *context)
                          {
                              auto subprotocols = String(req->getHeader("sec-websocket-protocol"));
                              auto jwt          = this->GetJwtFromSubprotocols(subprotocols);

                              if(!this->ValidateJwt(jwt))
                              {
                                  res->writeStatus("401");
                                  res->end("Invalid jwt");
                                  return;
                              }

                              if(this->IsMaximumCapacityReached())
                              {
                                  res->writeStatus("503");
                                  res->end("Maximum number of clients reached");
                                  return;
                              }

                              res->upgrade(ClientData{}, req->getHeader("sec-websocket-key"),
                                           req->getHeader("sec-websocket-protocol"), req->getHeader("sec-websocket-extensions"),
                                           context);
                          },
                          .open    = [this](auto *ws) { this->OnOpen(ws); },
                          .message = [this](auto *ws, std::string_view message, uWS::OpCode) { this->OnMessage(ws, message); },
                          .dropped =
                              [](auto * /*ws*/, std::string_view /*message*/, uWS::OpCode /*opCode*/)
                          {
                              /* A message was dropped due to set maxBackpressure and closeOnBackpressureLimit limit */
                          },
                          .drain =
                              [](auto * /*ws*/)
                          {
                              /* Check ws->getBufferedAmount() here */
                          },
                          .ping =
                              [](auto * /*ws*/, std::string_view)
                          {
                              /* Not implemented yet */
                          },
                          .pong =
                              [](auto * /*ws*/, std::string_view)
                          {
                              /* Not implemented yet */
                          },
                          .close = [this](auto *ws, int /*code*/, std::string_view /*message*/) { this->OnClose(ws); } })
              .listen("127.0.0.1", 50000,
                      [](auto *listen_socket)
                      {
                          if(listen_socket)
                          {
                              LogDebug("Ready on port 50000");
                          }
                      });

The Sender thread cyclically executes this code protected by mutex:

Sync lock(*_clientsMutex);
for(auto client : *_clients)
{
    SendRealtimeUpdate(client);
}

the SendRealtimeUpdate method after some elaboration provides a msg as std::string and sends it client->send(msg, uWS::OpCode::TEXT);

I think I'm missing something maybe there is a proper sync to implement between the message callback and the send of my thread that can happen in any moment.

I tried to use the uWebSockets pub/sub internal mechanism using the signal id as topic and it automatically manages the send when it is ready and this works. However this is inefficient because I want to send in a message multiple signals update and not one.


Solution

  • Swift - Friday Pie's comment is correct. uWebSockets is not thread-safe except for very few functions such as timers and "defer".

    Your comment asked:

    need to send a message every 1second to all connected clients

    Below is a thread-safe way to use a timer to send a message.

    #include "App.h"
       
    using namespace std;
    
    struct PerSocketData {};
    
    uWS::WebSocket<false, true, PerSocketData> *gws=nullptr;
        
    int main() {
       auto loop = uWS::Loop::get();
    
       struct us_timer_t *delayTimer = us_create_timer((struct us_loop_t *) loop, 0, 0);
    
       us_timer_set(delayTimer, [](struct us_timer_t *) {
                                   if (gws) {
                                      cout << "calling send" << endl;
                                      gws->send("from server", uWS::OpCode::TEXT);
                                   }
                                }, 1000, 1000);
       
       uWS::App app;
    
       app.ws<PerSocketData>("/*", {
             .idleTimeout = 0,
             .sendPingsAutomatically = false,
             .open = [](auto *ws) {
                        gws = ws;
                     },
             .close = [](auto */*ws*/, int /*code*/, std::string_view /*message*/) {
                         gws = nullptr;
                      }
          }).listen(9001, [](auto *) {
                          });
    
       app.run();
    }