Search code examples
cnetwork-programmingzeromqdistributed-systemlow-latency

ZMQ Multi-part flusher: Can you get the total number of parts in a multi-part message received via ZeroMQ without reading them all?


I am implementing a simple REQ-REP pattern with ZeroMQ in C using multi-part messaging. Most of my messages have strictly 4 parts each (to and fro) with a few exceptions. To enforce the rule, I need to determine the total number of parts of a multi-part message received. Knowing if it is <= 4 is easy. Here is my receiver function:

#define BUFMAX  64 // Maximum size of text buffers
#define BUFRCV  63 // Maximum reception size of text buffers (reserve 1 space to add a terminal '\0')

char mpartstr[4][BUFMAX];


int recv_multi(void *socket,int *aremore)

// Receive upto the first 4 parts of a multipart message into mpartstr[][].
// Returns the number of parts read (upto 4) or <0 if there is an error.
// Returns -1 if there is an error with a zmq function.
// It sets aremore=1 if there are still more parts to read after the fourth
// part (or aremore=0 if not).
{
 int len,rc,rcvmore,pdx,wrongpard=0;
 size_t rcvmore_size = sizeof(rcvmore);

 pdx=0;
 len=zmq_recv(socket, mpartstr[pdx], BUFRCV, 0);
 if(len==-1) return len;

 mpartstr[pdx][len]='\0';
 rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1;

 pdx++;
 if(rcvmore==0){*aremore=0; return pdx;}

 while(rcvmore){
   len=zmq_recv (socket, mpartstr[pdx], BUFRCV, 0); if(len==-1) return len; mpartstr[pdx][len]='\0';
   rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1; 
   pdx++;
   if(pdx==4) break;
 }

 *aremore=rcvmore;
 return pdx;
}

All fine. But now in my main() function I check to see if there are more parts by seeing the value of aremore. In those cases where I am not expecting more I will send an error message back to the sender but I have found that ZeroMQ doesn't like it if I don't read ALL the parts of a multi-part message (it reads the remaining parts of this old multi-part message next time I make a call to zmq_recv() function, even after I send a message and expect a new clean multi-part response).

So what I really need is a kind of "flush" function to clear the remaining parts of a message that contains more than 4 parts which I want to discard. So far the only way I have to do this is an ugly arbitrary brute force exhaustion function like so (aremore will have a value of 1 to begin with - it was set by the previous function):

int recv_exhaust(void *socket,int *aremore)

// Receive the remainder of a multipart message and discard the contents.
// Use this to clean out a multi-part 'inbox' from a wrongly sent message.
// Returns 0 on success
// Returns -1 on zmq function failure
// Returns -2 on failure to exhaust even after 1000 parts.
{                                                          
 int len,rc,rcvmore,pdx;
 size_t rcvmore_size = sizeof(rcvmore);

 pdx=1;
 rcvmore=*aremore;

 while(rcvmore){
   len=zmq_recv(socket, mpartstr[0], BUFRCV, 0); if(len==-1) return len;
   rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1; 
   pdx++;
   if(pdx>1000) return -2;
 }

 return 0;
}

If there is no dedicated 'flusher' API then at least I could get rid of my arbitrary 1000 message limit if I had some way of knowing in advance how many parts (in total) a given multi-part message has. Surely ZeroMQ knows this because multi-part messages are sent as a whole block. Can anyone point me to the way to find that info? Or is there a proper 'flusher' function/method out there? (for standard C please - not C++/C#, etc.). Thanks in advance.


Solution

  • Q : Can anyone point me to the way to find that info?

    Yes.

    Q : is there a proper 'flusher' function/method out there?

    Yes and No :

    As far as ZeroMQ v2.x up until v4.3.1, there was no explicit API-call to a "flusher"

    The beauty and the powers of the low-latency smart-messaging the ZeroMQ design delivers is built on a wisely crafted Zen-of-Zero : always preferring performance to comfort - as Zero-copy, Zero-warranty and other paradigms suggest.

    Naive ( and I bear a lot of pain to trivialise this down to resorting to use a primitive blocking recv() ... ) "flusher" has to go all the way till the ZMQ_RCVMORE does not NACK-flag any more parts "beyond" the multi-frame-last-message ( or zmq_msg_more() == 0 does conform the same ). Still, all these operations do just a pointer-handling, no data gets "moved/copied/read" from RAM, just the pointer(s) get assigned, so it is indeed both fast and I/O-efficient :

    int    more;
    size_t more_size = sizeof ( more );
    do {
          zmq_msg_t part;                       /* Create an empty ØMQ message to hold the message part */
          int rc = zmq_msg_init (&part);           assert (rc == 0 && "MSG_INIT failed" );
          rc = zmq_msg_recv (&part, socket, 0); /* Block until a message is available to be received from socket */
                                                   assert (rc != -1 && "MSG_RECV failed" );
                                                /* Determine if more message parts are to follow */
          rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
                                                   assert (rc == 0 && "GETSOCKOPT failed" );
          zmq_msg_close (&part);
          } while (more);
    

    Given the RFC-23/ZMTP documented properties, there are but a few (wire-level telemetry encoded) warranties:

    1) all messages get sent/delivered:

    • atomically ( either error-free binary-identical all frames, or none at all )
    • at most once ( per relevant peer )
    • in order

    2) multi-part messages get additionally an internal (in-band)-telemetry "advice" of state:

    • a bit-flagged state { 1: more-frames-follow| 0: no-more-frames }
    • a bit-flagged size-type { 0: 8b-direct-octet | 1: 64b-"network-endian"-coded }
    • a size-advice { 0~255: direct-size | 0~2^63-1: 64b-"network-endian"-coded-size }

    Documented zmq_recv() API is similarly rather explicit in this :

    Multi-part messages

    A ØMQ message is composed of 1 or more message parts. Each message part is an independent zmq_msg_t in its own right. ØMQ ensures atomic delivery of messages: peers shall receive either all message parts of a message or none at all. The total number of message parts is unlimited except by available memory.

    An application that processes multi-part messages must use the ZMQ_RCVMORE zmq_getsockopt(3) option after calling zmq_msg_recv() to determine if there are further parts to receive.


    Whatever "ugly" this may look on a first read, the worst-case that would fit in memory is a huuuuuge amount of SMALL-sized messages inside a multi-part message-frame.

    The resulting time to "get-rid-of-'em" is not zero, sure, yet the benefits of compact and efficient internal ZMTP-telemetry and low-latency stream-processing is way more important goal ( and was achieved ).

    If in doubts, benchmark the worst-case first :

    a) "produce" about 1E9 multipart-message frames, transporting Zero-sized payloads ( no data, but all the message-framing )

    b) "setup" simplest possible "topology" PUSH/PULL

    c) "select" transport-class of your choice { inproc:// | ipc:// | tipc:// | ... | vmci:// } - best stack-less inproc:// ( I would start a stress-test with this )

    d) stopwatch such blind-mechanical-Zero-shortcuts "flusher" between a ReferencePoint-S: when zmq_poll( ZMQ_POLLIN ) has POSACK-ed a presence of any read-able content and a ReferencePoint-E: when the last from the many-part multipart-message was looped-over by the blind-"flusher"-circus.


    RESULT INTERPRETATION :

    Those nanoseconds, spent between [S] and [E], do count as an evidence of a worst-case of the amount of the time that got "scapegoated" into a knowingly blind-"flusher"-looping circus. In real world use-cases, there will be additional reasons for potentially spending even more time on doing the same.

    Yet, it is fair not to forget, that the responsibility of sending of such { knowingly-such-sized | ill-formated }-multi-frame-BEAST(s) is the root cause of any operational-risks on dealing with this in an otherwise ultra-low-latency, high-(almost-linear)-scalability focused messaging/signaling framework.

    It is the art of the Zen-of-Zero, that has enabled this happen. All thanks to Pieter HINTJENS and his team, led by Martin SÚSTRIK, we all owe 'em a lot for being able to work with their legacy further on.