Search code examples
c++zeromqpublish-subscribedistributed-systemlow-latency

ZeroMQ Publish and Subscribe concurrently


I'm working on a program in C++ that needs to be able to send / receive JSON-payloads from an arbitrary number of other clients.

At first, I tried to implement PubNub service, but I figured I can't get and publish messages at the same time (even using two different contexts on distinct threads). I need to be able to do that. I also found that PubNub has too much latency for my liking.

I came across the ZeroMQ library which has a PUB/SUB model that would suit my needs. But all examples I came across explain how to implement it in a way that one process is a Publisher OR a Subscriber, and not both at the same time.

Ideally, I would like to program a server that would relay all messages coming from anyone, to anyone subscribed to a specific channel specified in the message. Anyone should be able to receive and publish messages to anyone else on the network, provided they are subscribed to the correct channel.


UPDATE 1:

Note : I do not need insurance of receipt because payload N+1 will take precedence over payload N. I want a send and forget mean of communication (UDP-like).

As requested : The PubNub limit of 32 kB per JSON-payload was perfect for me, I do not need more. In fact, my payloads are around 4 kB in average. All instances of clients will run on the same local network, so latency should be less than 5 ms ideally. As for the number of clients, there won't be more than 4 clients subscribed to the same channel/topic at a time.


UPDATE 2 :

I cannot predict how many channels/topics will exist ahead of time, but it will be in the order of dozens (most of the time), hundreds (at peak). Not thousands.


Questions:

Q1: - Can I implement such a behavior using ZeroMQ ?
Q2: - Is there any working sample demonstrating that (preferably in C++) ?
Q3: - If not, any suggestions for a library in C++ ?


pubsub architecture


Solution

  • ZeroMQ : is capable of serving this task well within scales given above
    nanomsg : is capable of serving this task too, a need to cross-check ports/bindings for clients

    Design review:

    • client instances are not persistent, may freely appear on their own, may freely disappear on their own or on error
    • client instance decides on it's own, what it is about to PUB-lish as a message payload
    • client instance decides on it's own, what it is about to SUB-scribe to as an actual incoming stream of messages TOPIC-filter
    • client instance exchanges ( sends ), on it's own, a plain, non-multipart, JSON-formatted messages it has prepared / produced
    • client instance collects ( receives ) messages for which it assumes to be in the same, non-multipart, JSON-formatted shape and for which an attempt to get 'em locally-processed will take place after a receive is complete
    • maximum # of client-instances is not exceeding a low number of hundreds
    • maximum size of any JSON-formatted payload is less than 32 kB, about a 4 kB on average
    • maximum latency acceptable on an E2E process-to-process delivery across a common LAN-collision domain is under 5,000 [usec]
    • server instance is a central-role and a persistent entity
    • server instance provides a known transport-class URL-target for all late-joiners'.connect()-s

    Proposal:

    server may deploy multiple behaviours to meet the given goals, using both the PUB and SUB behaviours, and provides a code-driven, fast, SUB-side attached, non-blocking event-loop .poll() with aligned re-transmission of any of it's SUB-side .recv()-ed payloads to it's PUB-side, currently .connect()-ed, audience ( live client instances ):

    set s_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
    and s_PUB_send = aZmqCONTEXT.socket( zmq.PUB );

    for performance reasons, that are not so tough here, one may also segregate workload-streams' processing by mapping each one on disjunct sub-sets of the multiple created I/O-threads:

    map s_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
    and s_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );

    set s_PUB_send.bind( "tcp://localhost:8899" );
    +
    set s_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // forever *every*-TOPIC
    set s_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
    set s_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // retain just the last msg
    set s_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
    set s_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );

    and s_SUB_recv.bind( "tcp://localhost:8888" ); // [PUB]s .connect()


    Similarly,
    client instance may deploy a reverse-facing tandem of both a PUB-endpoint and SUB-endpoint, ready to .connect() to a known transport-target-URL.

    The client specific subscription locally decides, what is to get filtered from the incoming stream of messages ( prior to ZeroMQ v.3.1 API the plentitude of all messages will get delivered to each client instance over the transport class, however since API v.3.1+, the topic-filter is being operated on the PUB-side, which in the desired modus-operandi eliminates the wasted volumes of data over the network, but at the same time, this increases the PUB-side processing overhead ( ref.: remarks on a principle of increased multi-I/O-threads mapping / performance boost above )

    set c_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
    and c_PUB_send = aZmqCONTEXT.socket( zmq.PUB );

    unless the payload-assembly/processing overhead grows close to the permitted End-to-End latency threshold, there shall be no need to separate / segregate the ZeroMQ low-level I/O-threads here:
    map c_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
    and c_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );

    set c_PUB_send.connect( "tcp://server:8888" ); // reverse .bind on [SUB]
    +
    set c_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // modified on-the-fly
    set c_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
    set c_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // take just last
    set c_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
    set c_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );
    and c_SUB_recv.connect( "tcp://server:8899" );


    Discussion:

    For hobby projects, there is not much more needed on the messaging-infrastructure, nevertheless for more serious domains, there are additional services both the server and client instances ought have some further formal-communication pattern behaviours added.
    - r/KBD for a remote keyboard, with a CLI-alike ad-hoc inspection utilities
    - KEEP_ALIVE transponders for allowing a system-wide state- / perf-monitoring
    - SIG_EXIT handlers for allowing a system-wide / instance-specific SIG_EXITs
    - distributed syslog service to allow to safely collect / store a non-blocking replica of log-records ( be it during debug phase or performance-tuninc phase or production-grade records-of-evidence collecting )

    - Identity Management tools for audit-trails et al

    - WhiteList/BlackList for adding robustness to the infrastructure to make it better immune to DoS-attack / poisoning erroneous NIC traffic-bursts et al

    - Adaptive Node re-Discovery for smarter / ad-hoc infrastructure design and status monitoring or when multi-role / ( N + M )-shaded active hot-standby role-handover/takeover scenarios et al come onto the stage

    Summary

    A1: Yes, fully within ZeroMQ capabilities
    A2: Yes, C++ code-samples in the ZeroMQ book / Guides available
    A3: Ref.: A1, plus may like indepth remark in Martin SUSTRIK's post on "Differences between nanomsg and ZeroMQ"

    Hope you will enjoy the powers of distributed processing, be it supported by ZeroMQ or nanomsg or both.

    Only one's own imagination is the limit.

    If interested in further details, one might love the book referred to in the The Best Next Step section of this post