I would like to use XSUB/XPUB to enable multiple ZMQ publishers and subscribers. Everything works when I use zmq.proxy(xpub_socket, xsub_socket)
, but I need something custom because I need to write code between XSUB and XPUB that examines the messages.
Here's where I'm at:
import time
import zmq
context = zmq.Context()
address = '127.0.0.1'
pub_port = '3000'
sub_port = '3001'
# XSUB socket
xsub_socket = context.socket(zmq.XSUB)
xsub_socket.bind(f'tcp://{address}:{pub_port}')
# XPUB socket
xpub_socket = context.socket(zmq.XPUB)
xpub_socket.bind(f'tcp://{address}:{sub_port}')
time.sleep(1)
# PUB socket
pub_socket = context.socket(zmq.PUB)
pub_socket.connect(f'tcp://{address}:{pub_port}')
# SUB socket
sub_socket = context.socket(zmq.SUB)
sub_socket.subscribe('')
sub_socket.connect(f'tcp://{address}:{sub_port}')
time.sleep(1)
pub_socket.send_string('test')
time.sleep(1)
print(poller.poll(0))
The values sent from the PUB socket do not reach the XSUB socket.
I read here that the first byte needs to be 1. Both of these also don't work:
pub_socket.send(b'\x01')
pub_socket.send_multipart([b'\x01', 'test'.encode('utf-8')])
What am I doing wrong here?
A PUB
socket won't send any messages to an XSUB
socket unless it has received a subscription request, which you get by calling subscribe
on a SUB
socket.
The only way those subscription messages get passed through is if you set up your XSUB/XPUB
proxy.
Here's a simple proxy that connects an XPUB
and XSUB
socket, printing out messages it receives in either direction:
import zmq
ctx = zmq.Context()
xpub_sock = ctx.socket(zmq.XPUB)
xpub_sock.bind("tcp://127.0.0.1:3000")
xsub_sock = ctx.socket(zmq.XSUB)
xsub_sock.bind("tcp://127.0.0.1:3001")
poller = zmq.Poller()
poller.register(xpub_sock, zmq.POLLIN)
poller.register(xsub_sock, zmq.POLLIN)
while True:
socks = dict(poller.poll())
if xpub_sock in socks and socks[xpub_sock] == zmq.POLLIN:
msg = xpub_sock.recv_multipart()
print("(sub)", msg)
xsub_sock.send_multipart(msg)
elif xsub_sock in socks and socks[xsub_sock] == zmq.POLLIN:
msg = xsub_sock.recv_multipart()
print("(pub)", msg)
xpub_sock.send_multipart(msg)
If I connect to this with an PUB
socket, like this...
import zmq
import time
ctx = zmq.Context()
pub_sock = ctx.socket(zmq.PUB)
pub_sock.connect("tcp://localhost:3001")
while True:
pub_sock.send_string("test")
time.sleep(1)
...I won't see any messages arriving at the XSUB
socket, because
there are no active subscriptions. However, if I connect a SUB
socket to the XPUB
socket and set a subscription...
import zmq
ctx = zmq.Context()
sub_sock = ctx.socket(zmq.SUB)
sub_sock.connect("tcp://localhost:3000")
sub_sock.subscribe("")
while True:
msg = sub_sock.recv()
print(msg)
...then I will start to see messages passing from the PUB
socket to
the XSUB
socket, and then from the XPUB
socket to the SUB
socket.