I am implementing a simple REQ-REP
pattern with ZeroMQ in C using multi-part messaging. Most of my messages have strictly 4 parts each (to and fro) with a few exceptions. To enforce the rule, I need to determine the total number of parts of a multi-part message received. Knowing if it is <= 4 is easy. Here is my receiver function:
#define BUFMAX 64 // Maximum size of text buffers
#define BUFRCV 63 // Maximum reception size of text buffers (reserve 1 space to add a terminal '\0')
char mpartstr[4][BUFMAX];
int recv_multi(void *socket,int *aremore)
// Receive upto the first 4 parts of a multipart message into mpartstr[][].
// Returns the number of parts read (upto 4) or <0 if there is an error.
// Returns -1 if there is an error with a zmq function.
// It sets aremore=1 if there are still more parts to read after the fourth
// part (or aremore=0 if not).
{
int len,rc,rcvmore,pdx,wrongpard=0;
size_t rcvmore_size = sizeof(rcvmore);
pdx=0;
len=zmq_recv(socket, mpartstr[pdx], BUFRCV, 0);
if(len==-1) return len;
mpartstr[pdx][len]='\0';
rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1;
pdx++;
if(rcvmore==0){*aremore=0; return pdx;}
while(rcvmore){
len=zmq_recv (socket, mpartstr[pdx], BUFRCV, 0); if(len==-1) return len; mpartstr[pdx][len]='\0';
rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1;
pdx++;
if(pdx==4) break;
}
*aremore=rcvmore;
return pdx;
}
All fine. But now in my main()
function I check to see if there are more parts by seeing the value of aremore
. In those cases where I am not expecting more I will send an error message back to the sender but I have found that ZeroMQ doesn't like it if I don't read ALL the parts of a multi-part message (it reads the remaining parts of this old multi-part message next time I make a call to zmq_recv()
function, even after I send a message and expect a new clean multi-part response).
So what I really need is a kind of "flush" function to clear the remaining parts of a message that contains more than 4 parts which I want to discard. So far the only way I have to do this is an ugly arbitrary brute force exhaustion function like so (aremore
will have a value of 1 to begin with - it was set by the previous function):
int recv_exhaust(void *socket,int *aremore)
// Receive the remainder of a multipart message and discard the contents.
// Use this to clean out a multi-part 'inbox' from a wrongly sent message.
// Returns 0 on success
// Returns -1 on zmq function failure
// Returns -2 on failure to exhaust even after 1000 parts.
{
int len,rc,rcvmore,pdx;
size_t rcvmore_size = sizeof(rcvmore);
pdx=1;
rcvmore=*aremore;
while(rcvmore){
len=zmq_recv(socket, mpartstr[0], BUFRCV, 0); if(len==-1) return len;
rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1;
pdx++;
if(pdx>1000) return -2;
}
return 0;
}
If there is no dedicated 'flusher' API then at least I could get rid of my arbitrary 1000 message limit if I had some way of knowing in advance how many parts (in total) a given multi-part message has. Surely ZeroMQ knows this because multi-part messages are sent as a whole block. Can anyone point me to the way to find that info? Or is there a proper 'flusher' function/method out there? (for standard C please - not C++/C#, etc.). Thanks in advance.
Q : Can anyone point me to the way to find that info?
Yes.
Q : is there a proper 'flusher' function/method out there?
Yes and No :
As far as ZeroMQ v2.x up until v4.3.1, there was no explicit API-call to a "flusher"
The beauty and the powers of the low-latency smart-messaging the ZeroMQ design delivers is built on a wisely crafted Zen-of-Zero : always preferring performance to comfort - as Zero-copy, Zero-warranty and other paradigms suggest.
Naive ( and I bear a lot of pain to trivialise this down to resorting to use a primitive blocking recv() ... ) "flusher" has to go all the way till the ZMQ_RCVMORE
does not NACK-flag any more parts "beyond" the multi-frame-last-message ( or zmq_msg_more() == 0
does conform the same ). Still, all these operations do just a pointer-handling, no data gets "moved/copied/read" from RAM, just the pointer(s) get assigned, so it is indeed both fast and I/O-efficient :
int more;
size_t more_size = sizeof ( more );
do {
zmq_msg_t part; /* Create an empty ØMQ message to hold the message part */
int rc = zmq_msg_init (&part); assert (rc == 0 && "MSG_INIT failed" );
rc = zmq_msg_recv (&part, socket, 0); /* Block until a message is available to be received from socket */
assert (rc != -1 && "MSG_RECV failed" );
/* Determine if more message parts are to follow */
rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
assert (rc == 0 && "GETSOCKOPT failed" );
zmq_msg_close (&part);
} while (more);
Given the RFC-23/ZMTP documented properties, there are but a few (wire-level telemetry encoded) warranties:
1) all messages get sent/delivered:
2) multi-part messages get additionally an internal (in-band)-telemetry "advice" of state:
{ 1: more-frames-follow| 0: no-more-frames }
{ 0: 8b-direct-octet | 1: 64b-"network-endian"-coded }
{ 0~255: direct-size | 0~2^63-1: 64b-"network-endian"-coded-size }
zmq_recv()
API is similarly rather explicit in this :Multi-part messages
A ØMQ message is composed of 1 or more message parts. Each message part is an independentzmq_msg_t
in its own right. ØMQ ensures atomic delivery of messages: peers shall receive either all message parts of a message or none at all. The total number of message parts is unlimited except by available memory.
An application that processes multi-part messages must use the ZMQ_RCVMORE zmq_getsockopt(3) option after calling zmq_msg_recv() to determine if there are further parts to receive.
Whatever "ugly" this may look on a first read, the worst-case that would fit in memory is a huuuuuge amount of SMALL-sized messages inside a multi-part message-frame.
The resulting time to "get-rid-of-'em" is not zero, sure, yet the benefits of compact and efficient internal ZMTP-telemetry and low-latency stream-processing is way more important goal ( and was achieved ).
If in doubts, benchmark the worst-case first :
a) "produce" about 1E9 multipart-message frames, transporting Zero-sized payloads ( no data, but all the message-framing )
b) "setup" simplest possible "topology" PUSH/PULL
c) "select" transport-class of your choice { inproc:// | ipc:// | tipc:// | ... | vmci:// }
- best stack-less inproc://
( I would start a stress-test with this )
d) stopwatch such blind-mechanical-Zero-shortcuts "flusher" between a ReferencePoint-S:
when zmq_poll( ZMQ_POLLIN )
has POSACK-ed a presence of any read-able content and a ReferencePoint-E:
when the last from the many-part multipart-message was looped-over by the blind-"flusher"-circus.
Those nanoseconds, spent between [S]
and [E]
, do count as an evidence of a worst-case of the amount of the time that got "scapegoated" into a knowingly blind-"flusher"-looping circus. In real world use-cases, there will be additional reasons for potentially spending even more time on doing the same.
Yet, it is fair not to forget, that the responsibility of sending of such { knowingly-such-sized | ill-formated }-multi-frame-BEAST(s) is the root cause of any operational-risks on dealing with this in an otherwise ultra-low-latency, high-(almost-linear)-scalability focused messaging/signaling framework.
It is the art of the Zen-of-Zero, that has enabled this happen. All thanks to Pieter HINTJENS and his team, led by Martin SÚSTRIK, we all owe 'em a lot for being able to work with their legacy further on.