Search code examples
c++performancequeuemonitoringzeromq

Monitor queue usage on ZeroMQ PULL socket


My application needs to pull data from a PUSH socket (I do not have control of the server side). The data is processed and written to file, which is single-threaded and I/O bound with variable performance.

I would like the application to do the minimum work to pull the messages into process memory so that a) the server is not blocked and won't drop any messages (which is already the case with just a normal PULL socket) and b) I can monitor the backlog of messages caused by slow processing.

I can achieve this using a wrapper socket with three sockets internally:

  • One for the actual connection to the server
  • A pair of sockets with an inproc:// connection where I can keep count of messages as they are sent and received. So the data flows is:

Thread 1

  • rx.recv()
  • inproc_tx.send()
  • queued_messages++

Thread 2

  • inproc_rx.recv()
  • queued_messages--
  • do_processing()

This seemed like a sensible way to buffer the messages, as it already has zmq::message_ts from recv() and I am sure it will be more efficiently implemented than rolling my own queuing logic. It works, but it seems like a lot of overhead and extra code when the underlying code already knows how many messages are queued to decide when to drop new messages.

Is there an API to query the queue usage on a ZeroMQ PULL socket? Or a better way to implement the above?


Solution

  • It's possible that you're underestimating what ZeroMQ will do for you under the hood for free! The PUSH/PULL pattern is used to send a message from the PUSHer to an available PULLer. If you have more than 1 PULLer, the result is that if one of them is getting a bit behind the demand, ZeroMQ will automatically send fewer messages to it.

    I also think that you'd be making a mistake in buffering a received message. If you'd fall behind processing it in your Thread 1, you're still going to fall behind processing it in Thread 2; you're just disguising underperformance with some extra latency. Processing messages asynchronously would be not making use of what PUSH/PULL would do for you, if you simply had both Thread 1 and Thread 2 as PULLers (instead of being pipe-lined together). Take a look at Figure 5 in this part of the guide.

    In terms of backlog, you could set the high water mark on the PUSH socket to some acceptable value, and put a time-out on zmq_send(). If you detect that zmq_send() timed out, then all is not well; the PULLers have fallen behind. And, really, that's all you need to know. There is no means I know of to discover how many messages are queued up, waiting, but when one gets right down to it it'll either be too many (time out), or none at all (the PULLers are keeping up). If the PULLers started to fall behind and you wanted to detect that early, set the PUSH high water mark to some low value.

    ADDED

    Given that your PULLer is held up by I/O, I suggest that the simplest thing to do is to:

    • get rid of thread 2 altogether, and also inproc_tx and inproc_rx as they're no longer needed,
    • set the ZMQ_RCVHWM option for your rx PULL socket to 0 (= no limit),
    • just let ZMQ do all the buffering for you. It'll amount to the same thing anyway,
    • Have thread 1 do the writing to disk, there's no need for that to be an async operation or anything like that.

    This way, ZMQ will buffer as many messages as RAM permits (instead of the default of 1000). It'll even buffer them on the sending side (up to whatever the sending HWM is set to - 1000 is the default) too if your side starts to exhaust memory and slow down. In thread 1 you could periodically ask the OS how much unallocated RAM there is left, with a shortage of free RAM now being your actual point of concern (rather than, how many messages are buffered). If you've still got free RAM, you're not holding up the PUSHer sending you all this stuff.

    It's also exploiting the management thread that lies behind each socket doing all the work; as that's library code, you may as well make it earn its keep and do the buffering for you, rather than complicating your code. You can think of your thread 1 being asynchronous to the socket management thread (which is what's actually handling the network connection for you on your behalf).