Search code examples
pythonzeromqpython-multiprocessingpyzmqinterprocess

How to publish and subscribe the latest message correctly using pyzmq?


I have a process A that publishes a message constantly and processes B and C subscribe to the topic and get the latest message published by the publisher in process A.

So, I set zmq.CONFLATE to both publisher and subscriber. However, I found that one subscriber was not able to receive messages.

def publisher(sleep_time=1.0, port="5556"):

    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.setsockopt(zmq.CONFLATE, 1)
    socket.bind("tcp://*:%s" % port)
    print ("Running publisher on port: ", port)

    while True:
        localtime = time.asctime( time.localtime(time.time()))
        string = "Message published time: {}".format(localtime)
        socket.send_string("{}".format(string))
        time.sleep(sleep_time)

def subscriber(name="sub", sleep_time=1, ports="5556"):

    print ("Subscriber Name: {}, Sleep Time: {}, Port: {}".format(name, sleep_time, ports))

    context = zmq.Context()
    print ("Connecting to publisher with ports %s" % ports)
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.CONFLATE, 1)
    socket.setsockopt_string(zmq.SUBSCRIBE, "")
    socket.connect ("tcp://localhost:%s" % ports)

    while True:

        message = socket.recv()
        localtime = time.asctime( time.localtime(time.time()))
        print ("\nSubscriber [{}]\n[RECV]: {} at [TIME]: {}".format(name, message, localtime))
        time.sleep(sleep_time)


if __name__ == "__main__":
    Process(target=publisher).start()
    Process(target=subscriber, args=("SUB1", 1.2, )).start()
    Process(target=subscriber, args=("SUB2", 1.1, )).start()

I tried to unset the socket.setsockopt(zmq.CONFLATE, 1) in the publisher, and that seemed to solve the problem. Both subscribers in processes B and C could receive messages and the messages seemed to be the latest ones.

I'm trying to find out why setting the publisher with CONFLATE caused the problem I had. I could not find information about it. Does anyone know what causes this behavior?

Also, I want to know, in the situation of one publisher to multiple subscribers, what is the correct code setup, so that subscriber can always get the latest messages?


Solution

  • It's most likely a timing issue, the ZMQ_CONFLATE socket option limits the inbound and outbound queue to 1 message.

    The way PUB/SUB works is the subscriber sends a subscription message to the publisher when you set the ZMQ_SUBSCRIBE option. If you start both subscribers at the same time then its possible that one of the subscription messages that arrived on the publisher queue will be discarded.

    Try adding a sleep between the starting each subscriber.

    From the zeromq docs

    If set, a socket shall keep only one message in its inbound/outbound queue, this message being the last message received/the last message to be sent. Ignores ZMQ_RCVHWM and ZMQ_SNDHWM options. Does not support multi-part messages, in particular, only one part of it is kept in the socket internal queue.

    I am not saying this is the solution to you problem, but if that is the case we may need to post a change to libzmq to make the conflate options more granular so you can choose if conflate should be applied to inbound or outbound queues.