I am implementing a many-to-many in process Pub/Sub interface with python zmq that can be built up and torn down as needed in a running application. I am using a proxy to connect the XSUB
and XPUB
sockets and am utilizing the zmq.proxy_steerable
variant so that I can use a control socket to send the TERMINATE
command to stop the proxy. When I do this I am getting an error when the TERMINATE
command is received by the proxy.
In the following MWE, a background thread is used to start the steerable proxy. After a brief sleep of 3 seconds, the TERMINATE
signal is sent to the control socket, which causes the zmq.proxy_steerable
line in the thread to raise an error.
mwe.py
import zmq
from threading import Thread
class Proxy:
def __init__(self, context: zmq.Context):
self.context = context
self.thread = Thread(
daemon=True,
target=self._run,
name='Proxy')
self.thread.start()
def _run(self):
publish_socket = self.context.socket(zmq.XPUB)
publish_socket.bind("inproc://subscribe")
subscribe_socket = self.context.socket(zmq.XSUB)
subscribe_socket.bind("inproc://publish")
control_socket = self.context.socket(zmq.SUB)
control_socket.connect("inproc://proxy")
control_socket.setsockopt_string(zmq.SUBSCRIBE, '')
zmq.proxy_steerable(
publish_socket, subscribe_socket,
control=control_socket)
def stop(self):
socket = self.context.socket(zmq.PUB)
socket.bind("inproc://proxy")
socket.send_string('TERMINATE')
self.thread.join()
self.thread = None
if __name__ == "__main__":
import time
context = zmq.Context()
proxy = Proxy(context)
time.sleep(3)
proxy.stop()
Which when run, raises the following error.
>> python mwe.py
Exception in thread Proxy:
Traceback (most recent call last):
File "/home/bellockk/.conda/envs/zmq/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
self.run()
File "/home/bellockk/.conda/envs/zmq/lib/python3.11/threading.py", line 982, in run
self._target(*self._args, **self._kwargs)
File "/home/bellockk/Development/zeromq/mwe.py", line 21, in _run
zmq.proxy_steerable(
File "zmq/backend/cython/_proxy_steerable.pyx", line 56, in zmq.backend.cython._proxy_steerable.proxy_steerable
File "zmq/backend/cython/checkrc.pxd", line 28, in zmq.backend.cython.checkrc._check_rc
zmq.error.ZMQError: Operation not supported
This I believe indicates the TERMINATE
is being received by the control socket, but is raising a very non-descriptive error. I'm currently catching this error and discarding it to work around the issue, but want to implement it properly.
Note: Everything works fine if I let garbage collection take care of everything at program close, but for my actual use case I need to build up and tear down a Pub/Sub interface over several iterations, so I need to be able to have the proxy cleanly stop and be able to restart later.
I am just a few days into learning zmq
, so I'm likely making a simple mistake. Any help is greatly appreciaited!
According to libzmq documentation, proxy_steerable
expects the REP
socket type for the control socket, but in your example, it's a SUB
socket.
Just replace it with the following code:
control_socket = self.context.socket(zmq.REP)
control_socket.bind("inproc://proxy")
And then you can stop proxy_steerable
with the following code:
control_socket = self.context.socket(zmq.REQ)
control_socket.connect("inproc://proxy")
control_socket.send("TERMINATE".encode())
I tested it locally, and it worked (with ipc
transport).