Search code examples
czeromq

ZeroMQ: Is there no other way than polling or sleep to check if a socket is connected?


We are using ZeroMQ for a project, with the following architecture: enter image description here

This is basically our module we wrote in C for using ZeroMQ:

#define PUB_ADDR "ipc:///tmp/broker_frontend"
#define SUB_ADDR "ipc:///tmp/broker_backend"
#define PUB_MON_ADDR "inproc://monitor-publisher"
#define SUB_MON_ADDR "inproc://monitor-subscriber"

#define RET_ERROR(x) syslog(LOG_ERR, x "%s", zmq_strerror(errno)); cleanupZMQ(); return false;

static zmq_ctx_t ctx = NULL;
static zmq_sock_t pub_sock = NULL;
static zmq_sock_t sub_sock = NULL;
static zmq_sock_t sub_mon_sock = NULL;
static zmq_sock_t pub_mon_sock = NULL;

static char* subscriptions[] = { REQ_TOPIC };
static size_t subscriptions_count = (sizeof(subscriptions) / sizeof(subscriptions[0]));

static inline void cleanupZMQ() {
    if (pub_sock) {
        zmq_close(pub_sock);
    }

    if (sub_sock) {
        zmq_close(sub_sock);
    }

    if (sub_mon_sock) {
        zmq_close(sub_mon_sock);
    }

    if (pub_mon_sock) {
        zmq_close(pub_mon_sock);
    }

    if (ctx) {
       zmq_ctx_destroy(ctx);
    }
}

static bool waitForConnect(zmq_sock_t monitor) {
    zmq_msg_t msg;

    bool ret = true;
    bool connected = false;
    bool handshaked = false;
    do {
        zmq_msg_init(&msg);
        int rc = zmq_msg_recv(&msg, monitor, 0);
        if (rc < 0) {
            RET_ERROR("Error! Can not receive first frame from monitor: ")
        }
        uint8_t* data = (uint8_t*)zmq_msg_data(&msg);
        uint16_t event = *((uint16_t*)data);
        uint32_t value = *((uint32_t*)data+2);

        zmq_msg_init(&msg);
        rc = zmq_msg_recv(&msg, monitor, 0);
        if (rc < 0) {
            RET_ERROR("Error! Can not receive second frame from monitor: ")
        }
        char* addr = (char*)zmq_msg_data(&msg);
        syslog(LOG_DEBUG, "Event: %u, Value: %u, Addr: %s", event, value, addr);
        if (event == ZMQ_EVENT_CONNECTED) {
            syslog(LOG_INFO, "Connected to '%s'.", addr);
            connected = true;
        } else if (event == ZMQ_EVENT_CONNECT_DELAYED) {
            syslog(LOG_NOTICE, "Connecting delayed!");
        } else if (event == ZMQ_EVENT_CONNECT_RETRIED) {
            syslog(LOG_NOTICE, "Connecting retried!");
        } else if ((event == ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL)
            || (event == ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL)
            || (event == ZMQ_EVENT_HANDSHAKE_FAILED_AUTH)) {
            syslog(LOG_ERR, "Error! Handshake with '%s' failed: %s", addr, zmq_strerror(value));
            handshaked = true;
            ret = false;
        } else if (event == ZMQ_EVENT_HANDSHAKE_SUCCEEDED) {
            syslog(LOG_INFO, "Handshake with '%s' succeded.", addr);
            handshaked = true;
            ret = true;
        } else {
            syslog(LOG_NOTICE, "Unexpected event: %u", event);
        }
    } while(!(connected && handshaked));

    zmq_msg_close(&msg);
    return ret;
}

bool startZMQ() {
    if (ctx == NULL) {
        ctx = zmq_ctx_new();
        if (ctx == NULL) {
            RET_ERROR("Error! Can not open ZMQ context: ");
        }
    } else {
        syslog(LOG_INFO, "ZMQ is already started.");
    }

    if (sub_sock == NULL) {
        sub_sock = zmq_socket(ctx, ZMQ_SUB);
        if (sub_sock == NULL) {
            RET_ERROR("Error! Can not open ZMQ sub socket: ");
        }

        if (zmq_socket_monitor(sub_sock, SUB_MON_ADDR, ZMQ_EVENT_ALL)) {
            RET_ERROR("Error! Can not monitor ZMQ sub socket: ");
        }

        sub_mon_sock = zmq_socket(ctx, ZMQ_PAIR);
        if (sub_mon_sock == NULL) {
            RET_ERROR("Error! Can not open ZMQ sub-monitor socket: ");
        }

        if (zmq_connect(sub_mon_sock, SUB_MON_ADDR)) {
            RET_ERROR("Error! Can not connect ZMQ sub-monitor socket: ");
        }

        for (size_t i=0; i<subscriptions_count; i++) {
            if (zmq_setsockopt(sub_sock, ZMQ_SUBSCRIBE, subscriptions[i], strlen(subscriptions[i]))) {
                syslog(LOG_ERR, "Error! Can not subscribe to topic '%s': %s", subscriptions[i], zmq_strerror(errno));
            } else {
                syslog(LOG_INFO, "Subscribed to '%s'.", subscriptions[i]);
            }
        }

        if (zmq_connect(sub_sock, SUB_ADDR)) {
            RET_ERROR("Error! Can not connect ZMQ sub socket: ");
        }
        waitForConnect(sub_mon_sock);
    } else {
        syslog(LOG_INFO, "Subscriber socket is already open.");
    }

    if (pub_sock == NULL) {
        pub_sock = zmq_socket(ctx, ZMQ_PUB);
        if (pub_sock == NULL) {
            RET_ERROR("Error! Can not open ZMQ pub socket: ");
        }

        if (zmq_socket_monitor(pub_sock, PUB_MON_ADDR, ZMQ_EVENT_ALL)) {
            RET_ERROR("Error! Can not monitor ZMQ pub socket: ");
        }

        pub_mon_sock = zmq_socket(ctx, ZMQ_PAIR);
        if (pub_mon_sock == NULL) {
            RET_ERROR("Error! Can not open ZMQ pub-monitor socket: ");
        }

        if (zmq_connect(pub_mon_sock, PUB_MON_ADDR)) {
            RET_ERROR("Error! Can not connect ZMQ pub-monitor socket: ");
        }

        if (zmq_connect(pub_sock, PUB_ADDR)) {
            RET_ERROR("Error! Can not connect ZMQ pub socket: ");
        }

        waitForConnect(pub_mon_sock);
    } else {
        syslog(LOG_INFO, "Publisher socket is already open.");
    }

    sleep(3);
    return true;
}

size_t sendZMQMsg(const char* topic, size_t topic_len, const msg_buffer_t msg, size_t msg_len) {
    size_t sended = 0;
    int rc = zmq_send(pub_sock, topic, topic_len, ZMQ_SNDMORE);
    if (rc < 0) {
        syslog(LOG_ERR, "Error! Could not send ZMQ topic: %s", zmq_strerror(errno));
        return 0;
    }
    sended += rc;
    rc = zmq_send(pub_sock, msg, msg_len, 0);
    if (rc < 0) {
        syslog(LOG_ERR, "Error! Could not send ZMQ message: %s", zmq_strerror(errno));
        return 0;
    }
    sended += rc;
    return sended;
}

void endZMQ() {
    cleanupZMQ();
}

As you can see, we had to add an sleep(3) at the end of our startZMQ() function. Without this the first few messages sent would get lost.

Of course we know about this 'slow joiner syndrome'. We ensure that the broker is ready before connecting anything to it and the subscribers are connected (also with the three second delay) before the publishers. But still, we have to wait these three seconds before the publishers can use their sockets. The publishers and subscribers do not know each other due to the central broker and we do not want that they have to connect directly to each other, since we have a lot of both parts and if anyone had to directly connect to anyone else, the system would basically unmaintainable.

We found this Question and this one and of course we read the Guide, especially this Part, where in the guide themself a sleep(1) was used and than a polling with a second socket-pair is recommended. Is there really no other way in this library to check if your socket is ready, than polling it?

As you can see we already catching the zmq-events in our waitForConnect function using zmq-monitor-sockets. Should this not be enough? Are we missing something here?


Solution

  • All explanations refer to our pub-sub architecture with the central broker.

    As described, waiting for the signals ZMQ_EVENT_CONNECTED and ZMQ_EVENT_HANDSHAKE_SUCCEEDED were not enough, to ensure that the publisher socket is usable. Further test showed, that the subscriber sockets suffer from the same problem.

    To be clear: we do not know how ZeroMQ can be properly used if you have no statement about it, if your socket is ready for usage.
    You can just start sending, and ignoring that the first messages are lost, not even buffered or something similar, just lost.
    You can wait a small amount of time, as shown in this Part of the documentation, but as described in the same Part of the documentation, it is unreliable, because you will never know for sure if the amount of time was enough and your sockets are really ready.

    The solution we came up with is the following: we added a neutral node to the system.
    This node is connected to the broker, like everyone else, but does nothing more than answer synchronization requests from every other node in the system.
    These requests only contain the pid of the sending node, which will be copied into the response for the request. So can every node identify, if its own request was acknowledged.

    This solution brings two drawbacks:

    1. There are no more clear subscribers and publishers. Every node has to open a publisher socket for sending the synchronization requests and a subscriber socket for receiving the synchronization response. At least you can close them after the synchronization.
    2. It makes a lot of message overhead in the system. Especially during the booting of the system we can see a lot more cpu load, than before.

    Also it exacerbates the problem, that you do not know if your counterpart is ready. If your nodes get the information, if the counterpart is up from an other source, than sending messages (i.e. from top or looking into /proc), you still do not know if the ZeroMQ connections for this counterpart are ready.
    So you have to decide for every publisher, if it just sends a stream and do not care about, if someone listens or not, or you have to acknowledge every message by the counterpart.