I have 3 processes, let's call them host
and worker1
and worker2
. I want worker1
and worker2
to be able to communicate with each other directly via PUB/SUB sockets (with host
interjecting intermittently), so I have the following setup:
# host
socket = ctx.Socket(zmq.PUB)
socket.bind('ipc:///tmp/comms')
# worker1
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
socket.send(b'worker1')
# worker2
socket = ctx.Socket(zmq.SUB)
socket.connect('ipc:///tmp/comms')
socket.setsockopt(zmq.SUBSCRIBE, b'worker1')
socket.recv()
As of now, this setup doesn't work. worker1
sends fine, but worker2
never seems to receive the message. However, if I now change the setup to this:
# host
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
# worker1
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
socket.connect(b'worker1')
# worker2
socket = ctx.Socket(zmq.SUB)
socket.bind('ipc:///tmp/comms')
socket.setsockopt(zmq.SUBSCRIBE, b'worker1')
socket.recv()
It works just fine. However, if I also bind in host
, it stops working again.
Why is this? What happens if I now have workerN
which also needs to subscribe to worker1
, how can I bind from all the processes? What are these bind/connect semantics? Isn't host
, being the long-lived process, doing the right thing by bind
ing, and if so, why is worker2
failing to receive when it is connect
ing?
MWE: https://gist.github.com/ooblahman/f8f9724b9995b9646ebdb79d26afd68a
import zmq
import time
from multiprocessing import Process
addr = 'ipc:///tmp/test_zmq'
def worker1():
ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.connect(addr)
while True:
sock.send(b'worker1')
print('worker1 sent')
time.sleep(1)
def worker2():
ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.connect(addr) # Change this to bind
sock.setsockopt(zmq.SUBSCRIBE, b'worker1')
while True:
sock.recv()
print('worker2 heard')
def main():
ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.bind(addr) # Change this to connect (or remove entirely)
p1 = Process(target=worker1)
p2 = Process(target=worker2)
p1.start()
p2.start()
p1.join()
if __name__ == '__main__':
main()
Q : "Isn't
host
, being the long-lived process, doing the right thing bybind
ing,...?"
Well, no one can tell.
There is no supporting evidence for any such claim ( "being the long-lived process" ), the less for any such deducted assumption ( "... and if so, why is worker2
failing to receive when it is connecting?" )
What we can tell is,
that worker2
may try to use either of the methods, be it a { .bind() | .connect() }
, successfully, yet still gets no warranty ever ( a wonderful Zen-of-Zero ) whether it will or will not POSACK
on receive. That depends on many other factors, the more in distributed-computing systems:
TOPIC
-list managed ( using and the order of call(s) to the .setsockopt( zmq.SUBSCRIBE, ... )
method ),PUB
-side agents did ever send any such message that was actually matching to any topic actively subscribed to or not,.send()
-er side, having all necessary means of transport from sender-side Context()
-instance to the hands of the intended receiving-side Context()
-instance and, finally, for a local delivery thereof via a .recv()
-method on any intended receiver-side(s) or not,The worker1
, in a PUB
-side role will actually never deliver a bit in the first example above.
Q : "Why is this?"
Asking a PUB
to setup a transport-link towards another PUB
is not a legitimate ZeroMQ Formal Communication Archetype pattern - such a call to .connect()
-method, responsible for asking for such a setup, ought get inspected for returning an error and the rest of the story is like observing two people, a PUB
-in-host
and a PUB
-in-worker1
, both trying to shout as much a each can into to their microphone, yet no one ever listening to a speaker, so never hearing a bit from anyone else.
Doable, yet not functional, is it?
Q : "What happens if I now have
workerN
which also needs to subscribe toworker1
, how can I bind from all the processes?"
We need not .bind()
, unless we want to.
If workerN
wants to subscribe to worker1
, it may .bind()
, yet worker1
has no clue there has appeared any new workerN
( to which it might want to .connect()
), does it? So the .connect()
is often let for the dynamically changing SUB
-side N-ary cohort of the 1:N
-setup ( actually being M:N
, in principle ) - having the PUB
-side on a known transport-class(es ... yes, can have more than one AccessPoints using more than one, ... yes, multiple transport-classes - all may get used from the one and the very same PUB
-agent for communication with one or many remote SUB
-agents - isn't that cool? ) - as the .bind()
-(s) established a know point of access for the dynamic cohort of N
-agents trying to .connect()
at some unknown time in the future.