Search code examples
pythonpyzmq

How do you cleanly terminate a zmq steerable proxy in python?


Context

I am implementing a many-to-many in process Pub/Sub interface with python zmq that can be built up and torn down as needed in a running application. I am using a proxy to connect the XSUB and XPUB sockets and am utilizing the zmq.proxy_steerable variant so that I can use a control socket to send the TERMINATE command to stop the proxy. When I do this I am getting an error when the TERMINATE command is received by the proxy.

Minimum Working Example (MWE)

In the following MWE, a background thread is used to start the steerable proxy. After a brief sleep of 3 seconds, the TERMINATE signal is sent to the control socket, which causes the zmq.proxy_steerable line in the thread to raise an error.

mwe.py

import zmq
from threading import Thread

class Proxy:
    def __init__(self, context: zmq.Context):
        self.context = context
        self.thread = Thread(
            daemon=True,
            target=self._run,
            name='Proxy')
        self.thread.start()

    def _run(self):
        publish_socket = self.context.socket(zmq.XPUB)
        publish_socket.bind("inproc://subscribe")
        subscribe_socket = self.context.socket(zmq.XSUB)
        subscribe_socket.bind("inproc://publish")
        control_socket = self.context.socket(zmq.SUB)
        control_socket.connect("inproc://proxy")
        control_socket.setsockopt_string(zmq.SUBSCRIBE, '')
        zmq.proxy_steerable(
            publish_socket, subscribe_socket,
            control=control_socket)

    def stop(self):
        socket = self.context.socket(zmq.PUB)
        socket.bind("inproc://proxy")
        socket.send_string('TERMINATE')
        self.thread.join()
        self.thread = None


if __name__ == "__main__":
    import time
    context = zmq.Context()
    proxy = Proxy(context)
    time.sleep(3)
    proxy.stop()

Which when run, raises the following error.

>> python mwe.py
Exception in thread Proxy:
Traceback (most recent call last):
  File "/home/bellockk/.conda/envs/zmq/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/home/bellockk/.conda/envs/zmq/lib/python3.11/threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "/home/bellockk/Development/zeromq/mwe.py", line 21, in _run
    zmq.proxy_steerable(
  File "zmq/backend/cython/_proxy_steerable.pyx", line 56, in zmq.backend.cython._proxy_steerable.proxy_steerable
  File "zmq/backend/cython/checkrc.pxd", line 28, in zmq.backend.cython.checkrc._check_rc
zmq.error.ZMQError: Operation not supported

This I believe indicates the TERMINATE is being received by the control socket, but is raising a very non-descriptive error. I'm currently catching this error and discarding it to work around the issue, but want to implement it properly.


Note: Everything works fine if I let garbage collection take care of everything at program close, but for my actual use case I need to build up and tear down a Pub/Sub interface over several iterations, so I need to be able to have the proxy cleanly stop and be able to restart later.


I am just a few days into learning zmq, so I'm likely making a simple mistake. Any help is greatly appreciaited!


Solution

  • According to libzmq documentation, proxy_steerable expects the REP socket type for the control socket, but in your example, it's a SUB socket. Just replace it with the following code:

    control_socket = self.context.socket(zmq.REP)
    control_socket.bind("inproc://proxy")
    

    And then you can stop proxy_steerable with the following code:

    control_socket = self.context.socket(zmq.REQ)
    control_socket.connect("inproc://proxy")
    control_socket.send("TERMINATE".encode())
    

    I tested it locally, and it worked (with ipc transport).