Search code examples
zeromqpyzmq

PUB/SUB: sub on listen socket / pub on connecting not working properly


I want to use ZeroMQ in a Python project for IPC. One process should receive control commands from other processes. Therefore I decided to follow the PUB/SUB example, but with switched roles.

One process uses zmq.SUB on the listener side,
other processes use zmq.PUB on the connector side.

But my problem is that not everything that was sent on the PUB side, was received on the SUB side.

Here is the code:

import zmq
import asyncio

IPC_SOCK = "/tmp/tmp.sock"

class DataObj:
    def __init__(self) -> None:
        self.value = 0

    def __str__(self) -> str:
        return f'DataObj.value: {self.value}'

async def server():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.bind(f'ipc://{IPC_SOCK}')
    socket.subscribe("")

    while True:
            try:
                obj = socket.recv_pyobj(flags=zmq.NOBLOCK)
                print(f'<-- {obj}')
                await asyncio.sleep(0.1)
            except zmq.Again:
                pass

            await asyncio.sleep(0.1)

async def client():
    print("Waiting for server to be come up")
    await asyncio.sleep(2)

    context = zmq.Context()
    socket = context.socket(zmq.PUB)

    socket.connect(f'ipc://{IPC_SOCK}')


    data_obj = DataObj()
    data_obj.value = 42

    print("Sending object once")
    socket.send_pyobj(data_obj)
    print(f"--> {data_obj}")
    print("Send object --> Not Received!")

    print("Sending object twice")
    for i in range(2):
        data_obj.value += 1
        socket.send_pyobj(data_obj)
        print(f"--> {data_obj}")
        await asyncio.sleep(0.1)

    print("Send both objects --> Received only once")
    

async def main():
    t_server = asyncio.create_task(server())
    t_client = asyncio.create_task(client())

    await t_client
    await t_server

   
if __name__ == "__main__":
    asyncio.run(main())

Thats the output generated. (On my desktop linux (arch), and same on my Raspberry Pi 4 (raspbian))

Waiting for server to be come up
Sending object once
--> DataObj.value: 42
Send object --> Not Received!
Sending object twice
--> DataObj.value: 43
--> DataObj.value: 44
<-- DataObj.value: 44
Send both objects --> Received only once

Anyone any idea how to solve the problem? Switching the socket from ipc:// Transport Class to tcp:// Transport Class, doesn't make any difference. So I assume the problem is something else here.

Is it generally allowed to do pub on a connector, and sub on the listener side?


Solution

  • There are several problems here...

    1. You're creating multiple ZMQ contexts. You should only be creating a single context, and then all your tasks should allocate sockets from that context.

    2. In your main task, you're first await-ing on the client, then on the server. That means that the server never runs until the client has stopped running, which is surely not what you intend!

    3. ZeroMQ has support for asyncio, so if you're writing asyncio client/server tasks, you should probably use asyncio sockets as well.

    4. You probably shouldn't be using PUB/SUB sockets. The pub/sub model is not reliable; unlike other socket types, zeromq will discard messages if the receiver is not up, if it can't keep up, etc. For what you're doing, you might be better off with REQ/REP sockets.

    I've modified your code, taking into account the above comments, to produce the following example (note that there are multiple clients to demonstrate that things work as expected in that situation):

    import zmq
    import zmq.asyncio
    import asyncio
    
    IPC_SOCK = "/tmp/tmp.sock"
    context = zmq.asyncio.Context()
    
    
    class DataObj:
        def __init__(self, value=0) -> None:
            self.value = value
    
        def __str__(self) -> str:
            return f"DataObj.value: {self.value}"
    
        def __add__(self, val):
            self.value += val
            return self
    
    
    async def server():
        socket = context.socket(zmq.REP)
        socket.bind(f"ipc://{IPC_SOCK}")
    
        print("Server is running")
    
        while True:
            obj = await socket.recv_pyobj()
            await socket.send(b"")
            print(f"<-- {obj}")
    
    
    async def client(id=0):
        socket = context.socket(zmq.REQ)
        socket.connect(f"ipc://{IPC_SOCK}")
    
        data_obj = DataObj((id * 100) + 42)
    
        print(f"client {id} sending object once")
        await socket.send_pyobj(data_obj)
        await socket.recv()
        print(f"{id} --> {data_obj}")
    
        print(f"client {id} sending object twice")
        for _ in range(2):
            data_obj += 1
            await socket.send_pyobj(data_obj)
            await socket.recv()
            print(f"{id} --> {data_obj}")
    
    
    async def main():
        await asyncio.gather(server(), client(1), client(2))
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    Running this code produces:

    Server is running
    client 1 sending object once
    client 2 sending object once
    <-- DataObj.value: 142
    <-- DataObj.value: 242
    1 --> DataObj.value: 142
    client 1 sending object twice
    2 --> DataObj.value: 242
    client 2 sending object twice
    <-- DataObj.value: 143
    <-- DataObj.value: 243
    1 --> DataObj.value: 143
    2 --> DataObj.value: 243
    <-- DataObj.value: 144
    <-- DataObj.value: 244
    1 --> DataObj.value: 144
    2 --> DataObj.value: 244