I'm working on a program in C++ that needs to be able to send / receive JSON
-payloads from an arbitrary number of other clients.
At first, I tried to implement PubNub service, but I figured I can't get and publish messages at the same time (even using two different contexts on distinct threads). I need to be able to do that. I also found that PubNub has too much latency for my liking.
I came across the ZeroMQ library which has a PUB/SUB
model that would suit my needs. But all examples I came across explain how to implement it in a way that one process is a Publisher OR a Subscriber, and not both at the same time.
Ideally, I would like to program a server that would relay all messages coming from anyone, to anyone subscribed to a specific channel specified in the message. Anyone should be able to receive and publish messages to anyone else on the network, provided they are subscribed to the correct channel.
UPDATE 1:
Note : I do not need insurance of receipt because payload N+1 will take precedence over payload N. I want a send and forget mean of communication (UDP-like).
As requested : The PubNub limit of 32 kB
per JSON
-payload was perfect for me, I do not need more. In fact, my payloads are around 4 kB
in average. All instances of clients will run on the same local network, so latency should be less than 5 ms
ideally. As for the number of clients, there won't be more than 4 clients subscribed to the same channel/topic at a time.
UPDATE 2 :
I cannot predict how many channels/topics will exist ahead of time, but it will be in the order of dozens (most of the time), hundreds (at peak). Not thousands.
Q1:
- Can I implement such a behavior using ZeroMQ
?
Q2:
- Is there any working sample demonstrating that (preferably in C++
) ?
Q3:
- If not, any suggestions for a library in C++
?
ZeroMQ :
is capable of serving this task well within scales given above
nanomsg :
is capable of serving this task too, a need to cross-check ports/bindings for clientsDesign review:
PUB
-lish as a message payloadSUB
-scribe to as an actual incoming stream of messages TOPIC
-filterJSON
-formatted messages it has prepared / producedJSON
-formatted shape and for which an attempt to get 'em locally-processed will take place after a receive is completeJSON
-formatted payload is less than 32 kB
, about a 4 kB
on average5,000 [usec]
URL
-target for all late-joiners'.connect()
-sProposal:
server may deploy multiple behaviours to meet the given goals, using both thePUB
andSUB
behaviours, and provides a code-driven, fast,SUB
-side attached, non-blocking event-loop.poll()
with aligned re-transmission of any of it'sSUB
-side.recv()
-ed payloads to it'sPUB
-side, currently.connect()
-ed, audience ( live client instances ):
sets_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
ands_PUB_send = aZmqCONTEXT.socket( zmq.PUB );
for performance reasons, that are not so tough here, one may also segregate workload-streams' processing by mapping each one on disjunct sub-sets of the multiple created I/O-threads:
maps_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
ands_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );
sets_PUB_send.bind( "tcp://localhost:8899" );
+
sets_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // forever *every*-TOPIC
sets_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
sets_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // retain just the last msg
sets_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
sets_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );
ands_SUB_recv.bind( "tcp://localhost:8888" ); // [PUB]s .connect()
Similarly,
client instance may deploy a reverse-facing tandem of both aPUB
-endpoint andSUB
-endpoint, ready to.connect()
to a known transport-target-URL
.
The client specific subscription locally decides, what is to get filtered from the incoming stream of messages ( prior toZeroMQ v.3.1 API
the plentitude of all messages will get delivered to each client instance over the transport class, however sinceAPI v.3.1+
, the topic-filter is being operated on thePUB
-side, which in the desired modus-operandi eliminates the wasted volumes of data over the network, but at the same time, this increases thePUB
-side processing overhead ( ref.: remarks on a principle of increased multi-I/O-threads mapping / performance boost above )
setc_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
andc_PUB_send = aZmqCONTEXT.socket( zmq.PUB );
unless the payload-assembly/processing overhead grows close to the permitted End-to-End latency threshold, there shall be no need to separate / segregate theZeroMQ
low-level I/O-threads here:
mapc_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
andc_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );
setc_PUB_send.connect( "tcp://server:8888" ); // reverse .bind on [SUB]
+
setc_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // modified on-the-fly
setc_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
setc_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // take just last
setc_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
setc_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );
andc_SUB_recv.connect( "tcp://server:8899" );
For hobby projects, there is not much more needed on the messaging-infrastructure, nevertheless for more serious domains, there are additional services both the server and client instances ought have some further formal-communication pattern behaviours added.
- r/KBD
for a remote keyboard, with a CLI
-alike ad-hoc inspection utilities
- KEEP_ALIVE
transponders for allowing a system-wide state- / perf-monitoring
- SIG_EXIT
handlers for allowing a system-wide / instance-specific SIG_EXIT
s
- distributed syslog
service to allow to safely collect / store a non-blocking replica of log-records ( be it during debug phase or performance-tuninc phase or production-grade records-of-evidence collecting )
- Identity Management
tools for audit-trails et al
- WhiteList/BlackList
for adding robustness to the infrastructure to make it better immune to DoS-attack / poisoning erroneous NIC traffic-bursts et al
- Adaptive Node re-Discovery
for smarter / ad-hoc infrastructure design and status monitoring or when multi-role / ( N + M )-shaded active hot-standby role-handover/takeover scenarios et al come onto the stage
A1
: Yes, fully within ZeroMQ capabilities
A2
: Yes, C++ code-samples in the ZeroMQ book / Guides available
A3
: Ref.: A1, plus may like indepth remark in Martin SUSTRIK's post on "Differences between nanomsg
and ZeroMQ
"
Hope you will enjoy the powers of distributed processing, be it supported by ZeroMQ
or nanomsg
or both.
Only one's own imagination is the limit.
If interested in further details, one might love the book referred to in the The Best Next Step section of this post