Search code examples
zeromqdistributed-computing

Using the majordomo broker with asynchronous clients


While reading the zeromq guide, I came across client code which sends 100k requests in a loop, and then receives the reply in a second loop.

#include "../include/mdp.h"
#include <time.h>


int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdp_client_t *session = mdp_client_new ("tcp://localhost:5555", verbose);
    int count;
    for (count = 0; count < 100000; count++) {
        zmsg_t *request = zmsg_new ();
        zmsg_pushstr (request, "Hello world");
        mdp_client_send (session, "echo", &request);
    }
    printf("sent all\n");

    for (count = 0; count < 100000; count++) {
        zmsg_t *reply = mdp_client_recv (session,NULL,NULL);
        if (reply)
            zmsg_destroy (&reply);
        else
            break;              //  Interrupted by Ctrl-C
        printf("reply received:%d\n", count);
    }
    printf ("%d replies received\n", count);
    mdp_client_destroy (&session);
    return 0;
}

I have added a counter to count the number of replies that the worker (test_worker.c) sends to the broker, and another counter in mdp_broker.c to count the number of replies the broker sends to a client. Both of these count up to 100k, but the client is receiving only around 37k replies.

If the number of client requests is set to around 40k, then it receives all the replies. Can someone please tell me why packets are lost when the client sends more than 40k asynchronous requests?

I tried setting the HWM to 100k for the broker socket, but the problem persists:

static broker_t *
s_broker_new (int verbose)
{
    broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));
    int64_t hwm = 100000;
    //  Initialize broker state
    self->ctx = zctx_new ();
    self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
    zmq_setsockopt(self->socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));

    zmq_setsockopt(self->socket, ZMQ_RCVHWM, &hwm, sizeof(hwm));
    self->verbose = verbose;
    self->services = zhash_new ();
    self->workers = zhash_new ();
    self->waiting = zlist_new ();
    self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
    return self;
}

Solution

  • Without setting the HWM and using the default TCP settings, packet loss was being incurred with just 50k messages.

    The following helped to mitigate the packet loss at the broker:

    1. Setting the HWM for the zeromq socket.
    2. Increasing the TCP send/receive buffer size.

    This helped only up to a certain point. With two clients, each sending 100k messages, the broker was able to manage fine. But when the number of clients was increased to three, they stopped receiving all the replies.

    Finally, what has helped me to ensure no packet loss is to change the design of the client code in the following way:

    1. A client can send upto N messages at once. The client's RCVHWM and broker's SNDHWM should be sufficiently high to hold a total of N messages.
    2. After that, for every reply received by the client, it sends two requests.