Search code examples
pythonpython-asynciopyzmq

ZMQ PUB/SUB asyncio doesn't receive any messages


I'm doing work on a pub/sub zmq receiver, one pub, multiple subs which can receive messages from one publisher in an asyncio environment. The receiver which is asyncio based doesn't receive any messages.

Snippet out of the receiver program :

async def zmq_listener(sub, stop_event):
    global mode
    while not stop_event.is_set():
        msg = await sub.recv_multipart()
        print(msg)
        if len(msg) == 1:
            print(msg[0].decode())

######################################
### MAIN PROGRAM STARTS HERE 
async def main():
    tasks = []
    tasks.append(asyncio.create_task(other_routine1(abc, stop_event)))

    if doZMQ:
        print("ZMQ Mode is enabled")
        ctx = zmq.asyncio.Context()
        sub = ctx.socket(zmq.SUB)
        ip = 'tcp://127.0.0.1:5559'
        sub.connect(ip)
        sub.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all topics        
        tasks.append(asyncio.create_task(zmq_listener(sub, stop_event)))

    await asyncio.gather(*tasks)

stop_event = asyncio.Event()

try:
    uvloop.install()
    asyncio.run(main(), debug=False)

except KeyboardInterrupt:
    print("Program interrupted by user")
    asyncio.run(stop())

The send application to test the receiver is following :

import zmq

ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
ip = 'tcp://127.0.0.1:5559'
pub.bind(ip)

pub.send_multipart([b"SHOW"])
pub.send_multipart([b"P1"])

Solution

  • The publisher is sending messages before the subscriber has had a chance to connect and subscribe to the messages, lets try to add a short delay like 1 second.

    import zmq
    import time
    
    ctx = zmq.Context()
    pub = ctx.socket(zmq.PUB)
    ip = 'tcp://127.0.0.1:5559'
    pub.bind(ip)
    
    time.sleep(1)
    
    pub.send_multipart([b"SHOW"])
    pub.send_multipart([b"P1"])