Search code examples
pythonzeromqpyzmqxpub

Is ZeroMQ XPUB recv() a solution for finding if there is a subscriber and solve the slow joiner syndrome?


My use case:

  1. The Subscriber will be a server(bind to a port) and it will wait messages from multiple publishers.
  2. The Publishers will be initialized inside different threads as clients(connect to the port).
  3. The data to publish in each thread will be a couple of messages.
  4. It is important when subscriber is connected, to get every message and as soon as possible.
  5. If subscriber is not connected then I don't want to keep the publisher thread blocked, ideally having a timeout around 1-2s it works.

The slow joiner problem:

Running more than 1000 threads(publishers) only 1 or 2 times I get all the data in Subscriber. Adding a sleep for some milliseconds solves the issue, so I'm 99.9% sure that I'm victim of the well-known slow joiner syndrome. However sleep solution in my case is not a good solution as connect time for publisher can be variable and I want the data to subscriber as soon as possible.

My thoughts and experiment code on solving this issue:

My solution is based on using XPUB recv method. Initialize publisher with XPUB and set RCVTIMEO to 1000ms. After publisher connection, I add a recv() call for checking if there is a subscriber. When I get the subscribe message, I know that connection has been finalized and that I can send data without any of them being lost (except if something wrong happen to subscriber but I don't care).

In case that I don't get any subscribe message then in 1000ms recv() times out and the thread is terminated.

Here is a sample code in python(pyzmq) to test this implementation (for publisher I don't use threads but a while loop and running multiple publishers at the same time) and it works as I wanted to:

publisher.py:

import zmq

def main():
    """ main method """

    i = 0
    while True:
        # Prepare context and publisher
        context = zmq.Context()
        publisher = context.socket(zmq.XPUB)
        publisher.connect("tcp://0.0.0.0:5650")
        publisher.setsockopt(zmq.RCVTIMEO, 1000)

        # Waiting for 1000ms to get a subscription
        i = i + 1
        try:
            publisher.recv()
            # Send the message
            publisher.send_multipart([b'test', bytes(str(i),'utf-8')])
        except Exception as e:
            print(e, flush=True)

        # Terminate socket and context
        publisher.close()
        context.term()
        if i >= 10000:
            break

if __name__ == "__main__":
    main()    

subscriber.py:

import zmq

def main():
    """ main method """

    # Prepare our context and subscriber
    context = zmq.Context()
    subscriber = context.socket(zmq.SUB)
    uri = "tcp://0.0.0.0:5650"
    subscriber.bind(uri)
    subscriber.setsockopt(zmq.SUBSCRIBE, b'')
    print('Subscriber connects to %s' % (uri), flush=True)

    # Receive messages
    i = 0
    while True:
        [topic, data] = subscriber.recv_multipart()
        i = i + 1
        print("%s: %s %s" % (i, topic, data), flush=True)

if __name__ == "__main__":
    main()

My question:

Is the solution that simple? Am I missing anything(related to slow joiner) that will cause loss of data if there is a subscriber active?


Solution

  • Q : "Is the solution that simple?"

    The very contrary. For what was posted above, the solution is over-complicated w.r.t. the so far posted use-case requirements.

    a) Given the requirements above, it is possible to eliminate all the costs associated with the setup & maintenance of the ISO-OSI-L3 tcp:// Transport Class, when communicating among threads co-located on the same host belonging to the same process. Rather go in for the ultra-fast, stack-less, memory-mapped inproc:// Transport Class to avoid all of these inefficiencies. ZeroMQ API v4.0+ have also the comfort of no other conditions on setting up the inproc://-TransportClass { .bind() | .connect() }-order of appearance, so we may enjoy the utmost MEM-mapped ultra-low latency flagging of Zero-Copy "transported" messages (without moving a byte of in-RAM data) - cool, isn't it? ( unless you need a MITM protocol-sniffing to be injected, remove the tcp:// overkill )

    b) Given the requirements above, a delivery of a couple of messages, where the "static" SUB-side subscribes to all of them, is enormously inefficient use of the PUB/SUB Scalable Formal Communications Pattern Archetype. Your code has to pay all costs to setup a new SUB-instance, then it crawls to setup a valid connection (over the tcp://-TransportClass' stack, hopefully removed under a) ), next wrangling to setup a new TOPIC-filter (be it operated on the SUB-side in earlier versions or on the PUB-side in the newer ZeroMQ releases -- all at remarkable costs of doing so just to receive all messages -- i.e. no filtering at all). The same formal service is achievable with a way more lightweight many-nodes-PUSH/PULL-on-one-node. If there is no other need for any reverse / bi-directional / more complex formal communications, this just one PUSH/PULL will be able to do the job requested.

    c) Given the requirements above, your accent seems to have been put on not losing messages by prematurely sending them over the connection. There are tools for ascertaining that in ZeroMQ settings, yet you take no care to use them:

    • using zmq.IMMEDIATE may use the blocking state of an AccessNode in cases no ready-made connection is working yet (or ever)
    • using return codes & errno ( or zmq.errno() for POSIX-incompatible operating systems / Win32 and likes ) handling may help your code to detect & to react to any and all specific situations that happen in "network-of-autonomous-agents" throughout the whole span of its lifecycle in the ( no matter if the agents are indeed actually "physically" distributed or co-located, as is the case here ). Not losing the control is the core responsibility here. What is a control-code, that self-locks in a lost-control state, where it cannot control even itself ;) ?

    d) Never use a blocking form of the { .recv() | .send() | .poll() | ... }-methods. Schoolbook examples are rather anti-patterns of how a professional signaling/messaging metaplane implementation ought look like. Indeed never - ref. item 5) above.

    e) Better re-use a Context()-instance, instead of making it a consumable/disposable as was sketched above. Threads can freely share a pre-instantiated Context()-engine, avoiding next huge amounts of repetitive add-on overhead costs, if re-instantiating a consumable/disposable Context() per each forked, just a short-lived, peer client thread.

    f) If anybody knows a better solution, keep us posted :o)

    Questions from comments

    a)
    Subscriber will be on another machine, so I think tcp:// is the solution.*

    Sure, NP here. { pgm:// | epgm:// | tipc:// }-transports might be interesting here if going further into higher performance levels direction

    b)
    Subscriber is going to forward via an XPUB socket the messages to other subscribers. PUSH/PULL could work but if I want to pass those subscriptions and their filters to the initial publishers and filter some messages at the source, I have to use PUB/SUB pattern.

    Well, not mentioned in the O/P. Any layering of XPUBs/XSUBs may work well, the problems are on connection-management level

    c)
    Just to clarify, not losing messages is important only when there is a subscriber. Could you explain this part a little more?

    Sure, having no subscriber available on an RTO-connected link, ready for an immediate delivery "across the wire", no messages could be ever delivered ( and could be silently dropped, which is what you try to fight against, don't you? ). That is what zmq.IMMEDIATE can manage via a call to a .setsockopt()-method.