Search code examples
pythonzeromqabort

How to abort context.socket.recv() the right way in ZeroMQ?


I have a small software where I have a separate thread which is waiting for ZeroMQ messages. I am using the PUB/SUB communication protocol of ZeroMQ.

Currently I am aborting that thread by setting a variable "cont_loop" to False.

But I discovered that, when no messages arrive to the ZeroMQ subscriber I cannot exit the thread (without taking down the whole program).

def __init__(self):
    Thread.__init__(self)
    self.cont_loop = True

def abort(self):
    self.continue_loop = False

def run(self):
    zmq_context = zmq.Context()
    zmq_socket = zmq_context.socket(zmq.SUB)
    zmq_socket.bind("tcp://*:%s" % *(5556))
    zmq_socket.setsockopt(zmq.SUBSCRIBE, "")
    while self.cont_loop:
        data = zmq_socket.recv()
        print "Message: " + data
    zmq_socket.close()
    zmq_context.term()
    print "exit"

I tried to move socket.close() and context.term() to abort-method. So that it shuts down the subscriber but this killed the whole program.

What is the correct way to shut down the above program?


Solution

  • Q: What is the correct way to ... ?

    A: There are many ways to achieve the set goal. Let me pick just one, as a mock-up example on how to handle distributed process-to-process messaging.

    First. Assume, there are more priorities in typical software design task. Some higher, some lower, some even so low, that one can defer an execution of these low-priority sub-tasks, so that there remains more time in the scheduler, to execute those sub-tasks, that cannot handle waiting.

    This said, let's view your code. The SUB-side instruction to .recv() as was being used, causes two things. One visible - it performs a RECEIVE operation on a ZeroMQ-socket with a SUB-behaviour. The second, lesser visible is, it remains hanging, until it gets something "compatible" with a current state of the SUB-behaviour ( more on setting this later ).

    This means, it also BLOCKS all the time since such .recv() method call UNTIL some unknown, locally uncontrollable coincidence of states/events makes it to deliver a ZeroMQ-message, with it's content being "compatible" with the locally pre-set state of this (still blocking) SUB-behaviour instance.

    That may take ages.

    This is exactly why .recv() is being rather used inside a control-loop, where external handling gets both the chance & the responsibility to do what you want ( including abort-related operations & a fair / graceful termination with proper resources' release(s) ).

    Receive process becomes .recv( flags = zmq.NOBLOCK ) in rather a try: except: episode. Such a way your local process does not lose it's control over the stream-of-events ( incl. the NOP being one such ).

    The best next step?

    Take your time and get through a great book of gems, "Code Connected, Volume 1", Pieter HINTJENS, co-father of the ZeroMQ, has published ( also as PDF ).

    Many his thoughts & errors to be avoided that he had shared with us is indeed worth your time.

    Enjoy the powers of ZeroMQ. It's very powerful & worth getting mastered top-down.