Search code examples
socketszeromqdistributed-computingpyzmqlow-latency

Pyzmq SUB doesn't receive messages unless I use bind


I have 3 processes, let's call them host and worker1 and worker2. I want worker1 and worker2 to be able to communicate with each other directly via PUB/SUB sockets (with host interjecting intermittently), so I have the following setup:

# host
socket = ctx.Socket(zmq.PUB)
socket.bind('ipc:///tmp/comms')

# worker1
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
socket.send(b'worker1')

# worker2
socket = ctx.Socket(zmq.SUB)
socket.connect('ipc:///tmp/comms')
socket.setsockopt(zmq.SUBSCRIBE, b'worker1')
socket.recv()

As of now, this setup doesn't work. worker1 sends fine, but worker2 never seems to receive the message. However, if I now change the setup to this:

# host
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')

# worker1
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
socket.connect(b'worker1')

# worker2
socket = ctx.Socket(zmq.SUB)
socket.bind('ipc:///tmp/comms')
socket.setsockopt(zmq.SUBSCRIBE, b'worker1')
socket.recv()

It works just fine. However, if I also bind in host, it stops working again.

Why is this? What happens if I now have workerN which also needs to subscribe to worker1, how can I bind from all the processes? What are these bind/connect semantics? Isn't host, being the long-lived process, doing the right thing by binding, and if so, why is worker2 failing to receive when it is connecting?


MWE: https://gist.github.com/ooblahman/f8f9724b9995b9646ebdb79d26afd68a

import zmq
import time
from multiprocessing import Process

addr = 'ipc:///tmp/test_zmq'

def worker1():
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PUB)
    sock.connect(addr)
    while True:
        sock.send(b'worker1')
        print('worker1 sent')
        time.sleep(1)

def worker2():
    ctx = zmq.Context()
    sock = ctx.socket(zmq.SUB)
    sock.connect(addr) # Change this to bind
    sock.setsockopt(zmq.SUBSCRIBE, b'worker1')
    while True:
        sock.recv()
        print('worker2 heard')

def main():
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PUB)
    sock.bind(addr) # Change this to connect (or remove entirely)

    p1 = Process(target=worker1)
    p2 = Process(target=worker2)
    p1.start()
    p2.start()

    p1.join()

if __name__ == '__main__':
    main()

Solution

  • Q : "Isn't host, being the long-lived process, doing the right thing by binding,...?"

    Well, no one can tell.
    There is no supporting evidence for any such claim ( "being the long-lived process" ), the less for any such deducted assumption ( "... and if so, why is worker2 failing to receive when it is connecting?" )

    What we can tell is,
    that worker2 may try to use either of the methods, be it a { .bind() | .connect() }, successfully, yet still gets no warranty ever ( a wonderful Zen-of-Zero ) whether it will or will not POSACK on receive. That depends on many other factors, the more in systems:

    • if / when / how was the TOPIC-list managed ( using and the order of call(s) to the .setsockopt( zmq.SUBSCRIBE, ... ) method ),
    • if PUB-side agents did ever send any such message that was actually matching to any topic actively subscribed to or not,
    • if default and / or configured resources were actually all in a state, which was sufficient for accepting any such potentially deliverable message for a transport on a .send()-er side, having all necessary means of transport from sender-side Context()-instance to the hands of the intended receiving-side Context()-instance and, finally, for a local delivery thereof via a .recv()-method on any intended receiver-side(s) or not,

      so indeed not easy to say in general. Your code suffers from several principal collisions:

    The worker1, in a PUB-side role will actually never deliver a bit in the first example above.


    Q : "Why is this?"

    Asking a PUB to setup a transport-link towards another PUB is not a legitimate ZeroMQ Formal Communication Archetype pattern - such a call to .connect()-method, responsible for asking for such a setup, ought get inspected for returning an error and the rest of the story is like observing two people, a PUB-in-host and a PUB-in-worker1, both trying to shout as much a each can into to their microphone, yet no one ever listening to a speaker, so never hearing a bit from anyone else.

    Doable, yet not functional, is it?


    Q : "What happens if I now have workerN which also needs to subscribe to worker1, how can I bind from all the processes?"

    We need not .bind(), unless we want to.

    If workerN wants to subscribe to worker1, it may .bind(), yet worker1 has no clue there has appeared any new workerN ( to which it might want to .connect() ), does it? So the .connect() is often let for the dynamically changing SUB-side N-ary cohort of the 1:N-setup ( actually being M:N, in principle ) - having the PUB-side on a known transport-class(es ... yes, can have more than one AccessPoints using more than one, ... yes, multiple transport-classes - all may get used from the one and the very same PUB-agent for communication with one or many remote SUB-agents - isn't that cool? ) - as the .bind()-(s) established a know point of access for the dynamic cohort of N-agents trying to .connect() at some unknown time in the future.