Search code examples
c++publish-subscribegoogletestnanomsg

Why would nng_dial ever hang?


I'm running into a perplexing issue with nng (EDIT and gtest) in c++. The first code block below is a simple program which creates a publisher and continuously sends messages. The second code block below is a simple program which successfully subscribes and recieves messages from this publisher so I can confidently say that I'm not having an issue setting up the publisher.

I've attached two tests I created to run with gtest. The second test will always hang on nng_dial, even though I've done everything possible to clean up resources by that point. nng_dial should return immediately, even on failures. Does anyone know why it would ever hang (and only in a gtest context)?

I have also tried the equivalent code of creating dialer and starting it but to the same end.

Thanks for any help!

#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
#include <iostream>
#include <cstring>
#include <string>
#include <chrono>
#include <thread>

#define URL "ipc:///tmp/tc3-pub"

int main() {
    nng_socket sock;
    int rv;

    // Open the socket
    if ((rv = nng_pub0_open(&sock)) != 0) {
        std::cout << "Failed to open nng socket\n";
        return rv;
    }

    // Listen for subscribers
    if ((rv = nng_listen(sock, URL, NULL, 0)) != 0) {
        std::cout << "Failed to listen on nng socket\n";
        return rv;
    }

    std::cout << "Socket is listening!\n";
    
    for (int i = 0; i < 60; ++i) {
        std::string msg = "Message " + std::to_string(i);
        if ((rv = nng_send(sock, (void *) msg.c_str(), msg.size() + 1, 0)) != 0) {
            std::cout << "Failed to send message\n";
            return rv;
        }

        std::cout << "Sent: " << msg << "\n";

        // Sleep for one second
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    // Cleanup the socket
    nng_close(sock);

    return 0;
}
#include <nng/nng.h>
#include <nng/protocol/pubsub0/sub.h>
#include <nng/protocol/pubsub0/pub.h>
#include <iostream>
#include <cstring>
#include <string>
#include <thread>

#define URL "ipc:///tmp/tc3-pub"

int main() {
    nng_socket sock;
    int rv;

    // Open the socket
    if ((rv = nng_sub0_open(&sock)) != 0) {
        std::cerr << "Failed to open nng socket\n";
        return rv;
    }

    nng_socket_set(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0);

    std::cout << "Opened a socket!\n";
    
    // Dial (connect to) the server
    if ((rv = nng_dial(sock, URL, NULL, 0)) != 0) {
        std::cerr << "Failed to dial on nng socket\n";
        return rv;
    }

    // Receive and print messages continuously until an error occurs
    while (true) {
        char* buf = NULL;
        size_t size = 0;
        rv = nng_recv(sock, &buf, &size, NNG_FLAG_ALLOC);
        if (rv != 0) {
            std::cerr << "Failed to receive message\n";
            break;
        }

        std::cout << "Received message: " << std::string(buf, size) << '\n';
        nng_free(buf, size);
    }

    nng_close(sock);

    return rv;
}
#include "gtest/gtest.h"

#include <cstdlib>
#include <filesystem>
#include <condition_variable>
#include <mutex>
#include <thread>

#include "nng/nng.h"
#include "nng/protocol/pubsub0/pub.h"
#include "nng/protocol/pubsub0/sub.h"

TEST(NngTest, Test1)
{
  nng_socket sock;
  int rv;
    
  if ((rv = nng_sub0_open(&sock)) != 0) {
      std::cout << "Failed to open nng socket\n";
  }
  std::cout << "nng_sub0_open done\n";

  nng_close(sock);
  nng_fini();
}

TEST(NngTest, Test2)
{
  nng_socket sock;
  int rv;
    
  if ((rv = nng_sub0_open(&sock)) != 0) {
      std::cout << "Failed to open nng socket\n";
  }
  std::cout << "nng_sub0_open done\n";

  if ((rv = nng_socket_set(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
      std::cout << "Failed to set nng socket\n";
  }
  std::cout << "nng_socket_set done\n";

  if ((rv = nng_dial(sock, "ipc:///tmp/tc4-pub", NULL, 0)) != 0) {
      std::cout << "Failed to dial nng socket\n";
  }
  std::cout << "nng_dial done\n";

  nng_close(sock);
  nng_fini();
}


int main(int argc, char** argv)
{
  try {
    ::testing::InitGoogleTest(&argc, argv);

    int retval = RUN_ALL_TESTS();
    std::cout << "Finished running all tests\n";

    return retval;
  } catch (const std::exception& e) {
    std::cerr << "Caught exception: " << e.what() << '\n';
  } catch (...) {
    std::cerr << "Caught unknown exception\n";
  }

  return -1;
}

Solution

  • nng_fini turned out to be the killer. From the documentation in their github (because its not documented on their website):

    // nng_fini is used to terminate the library, freeing certain global resources.
    // This should only be called during atexit() or just before dlclose().
    // THIS FUNCTION MUST NOT BE CALLED CONCURRENTLY WITH ANY OTHER FUNCTION
    // IN THIS LIBRARY; IT IS NOT REENTRANT OR THREADSAFE.
    //
    // For most cases, this call is unnecessary, but it is provided to assist
    // when debugging with memory checkers (e.g. valgrind).  Calling this
    // function prevents global library resources from being reported incorrectly
    // as memory leaks.  In those cases, we recommend doing this with atexit().
    NNG_DECL void nng_fini(void);
    

    In my case, nng_fini was being invoked when the previous test wrapped up. Although I was calling it in the initial code I shared here, the production code had it tucked away in the class shutdown process.

    Strangely enough, this didn't stir up any trouble when I was running the show on Linux, but the moment I tried to port the library to Windows and ran the tests, we ran into issues. nng_dial started hanging after the first call to nng_fini (so when the first test finished).

    So, nng users beware. I would suggest just not using this method unless you need to. I investigated its use in our code and I don't believe it was necessary.