I have a ZMQ_PULL/ZMQ_PUSH socket connection.
I have multiple ZMQ_PUSH connections pushing to a single ZMQ_PULL connection.
ZMQ_PUSH connection 1----->
ZMQ_PUSH connection 2-----> ZMQ_PULL
ZMQ_PUSH connection N----->
I do not need every message, I just need the latest message that was sent. I am doing some inference on the back end and am streaming the results to the ZMQ_PULL socket.
I have set the ZMQ_PULL socket to Conflate=true "If set, a socket shall keep only one message in its inbound/outbound queue, this message being the last message received/the last message to be sent. Ignores ZMQ_RCVHWM and ZMQ_SNDHWM options."
But after testing I realize I actually need the last message of each connection, not just the last message. So, if 3 connections, it grabs in a round robin from each connection, so I constantly have the latest from each connection.
Is there an option that is like Conflate, but instead of for all messages, it is for each connection?
Is there an option that is like Conflate, but instead of for all messages, it is for each connection?
No.
The documentation you cite explains that 0MQ does not currently offer direct support for such a single-socket use case. You could certainly code it up and submit an upstream PR so that future revs of 0MQ offer such functionality.
Given that you'll need app-level support to make
this work with 0MQ 4.3, simplest approach would
be to maintain N
ZMQ_PULL sockets with ZMQ_CONFLATE
set, as you're already aware.
An alternate approach would be to assign a dedicated thread or process to keep draining the existing muxed socket, and update a shared memory data structure that interested clients could consult. The idea is to burn a core on keeping the queue mostly empty, while doing no processing, just focusing on communications. Then other cores can examine "most recent message" and each one then embarks on some expensive processing, while another core continues to keep the queue drained. This is essentially offering the 0MQ service proposed above but at a different place in the stack, up a level, within your application.
To do this in a distributed way, the "queue draining service" would need to know about idle workers. That is, a worker could publish a brief "I just completed an expensive task" message, which would trigger the drainer to post a fresh work item, never using shared memory at all. This lets the drainer worry about eliding dup messages that arrived when no one was available to immediately start work on them, which have been superseded by a more recent message.